diff --git a/metastable_baselines/distributions/distributions.py b/metastable_baselines/distributions/distributions.py index fe4005f..fa5418f 100644 --- a/metastable_baselines/distributions/distributions.py +++ b/metastable_baselines/distributions/distributions.py @@ -37,16 +37,16 @@ class Strength(Enum): class ParametrizationType(Enum): - CHOL = 0 - ARCHAKOVA = 1 + CHOL = 1 + ARCHAKOVA = 2 class EnforcePositiveType(Enum): - LOG = 0 - RELU = 1 - SELU = 2 - ABS = 3 - SQ = 4 + LOG = 1 + RELU = 2 + SELU = 3 + ABS = 4 + SQ = 5 class UniversalGaussianDistribution(SB3_Distribution): @@ -60,8 +60,10 @@ class UniversalGaussianDistribution(SB3_Distribution): super(UniversalGaussianDistribution, self).__init__() self.par_strength = Strength.DIAG self.cov_strength = Strength.DIAG - self.par_type = None - self.enforce_positive_type = None + self.par_type = ParametrizationType.CHOL + self.enforce_positive_type = EnforcePositiveType.LOG + + self.distribution = None def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]: """ @@ -74,264 +76,66 @@ class UniversalGaussianDistribution(SB3_Distribution): :return: """ mean_actions = nn.Linear(latent_dim, self.action_dim) - if self.contextual_cov: - log_std = nn.Linear(latent_dim, self.action_dim) + + if self.par_strength == Strength.NONE: + if self.cov_strength == Strength.NONE: + pseudo_cov = th.ones(self.action_dim) * log_std_init + elif self.cov_strength == Strength.SCALAR: + pseudo_cov = th.ones(self.action_dim) * \ + nn.Parameter(log_std_init, requires_grad=True) + elif self.cov_strength == Strength.DIAG: + pseudo_cov = nn.Parameter( + th.ones(self.action_dim) * log_std_init, requires_grad=True) + elif self.cov_strength == Strength.FULL: + # Off-axis init? + pseudo_cov = nn.Parameter( + th.diag_embed(th.ones(self.action_dim) * log_std_init), requires_grad=True) + elif self.par_strength == self.cov_strength: + if self.par_strength == Strength.NONE: + pseudo_cov = th.ones(self.action_dim) + elif self.par_strength == Strength.SCALAR: + std = nn.Linear(latent_dim, 1) + pseudo_cov = th.ones(self.action_dim) * std + elif self.par_strength == Strength.DIAG: + pseudo_cov = nn.Linear(latent_dim, self.action_dim) + elif self.par_strength == Strength.FULL: + raise Exception("Don't know how to implement yet...") + elif self.par_strength > self.cov_strength: + raise Exception( + 'The parameterization can not be stronger than the actual covariance.') else: - log_std = nn.Parameter( - th.ones(self.action_dim) * log_std_init, requires_grad=True) - return mean_actions, log_std + if self.par_strength == Strength.SCALAR and self.cov_strength == Strength.DIAG: + factor = nn.Linear(latent_dim, 1) + par_cov = th.ones(self.action_dim) * \ + nn.Parameter(1, requires_grad=True) + pseudo_cov = par_cov * factor[0] + elif self.par_strength == Strength.SCALAR and self.cov_strength == Strength.FULL: + raise Exception( + 'That does not even make any sense...') + else: + raise Exception( + 'Programmer-was-to-lazy-to-implement-this-Exception') - def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution": + return mean_actions, pseudo_cov + + def proba_distribution(self, mean_actions: th.Tensor, pseudo_cov: th.Tensor) -> "UniversalGaussianDistribution": """ - Create the distribution given its parameters (mean, std) + Create the distribution given its parameters (mean, pseudo_cov) :param mean_actions: - :param log_std: + :param pseudo_cov: :return: """ - action_std = th.ones_like(mean_actions) * log_std.exp() - self.distribution = Normal(mean_actions, action_std) - return self - - def log_prob(self, actions: th.Tensor) -> th.Tensor: - """ - Get the log probabilities of actions according to the distribution. - Note that you must first call the ``proba_distribution()`` method. - - :param actions: - :return: - """ - log_prob = self.distribution.log_prob(actions) - return sum_independent_dims(log_prob) - - def entropy(self) -> th.Tensor: - return sum_independent_dims(self.distribution.entropy()) - - def sample(self) -> th.Tensor: - # Reparametrization trick to pass gradients - return self.distribution.rsample() - - def mode(self) -> th.Tensor: - return self.distribution.mean - - def actions_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor, deterministic: bool = False) -> th.Tensor: - # Update the proba distribution - self.proba_distribution(mean_actions, log_std) - return self.get_actions(deterministic=deterministic) - - def log_prob_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor) -> Tuple[th.Tensor, th.Tensor]: - """ - Compute the log probability of taking an action - given the distribution parameters. - - :param mean_actions: - :param log_std: - :return: - """ - actions = self.actions_from_params(mean_actions, log_std) - log_prob = self.log_prob(actions) - return actions, log_prob - - -class DiagGaussianDistribution(SB3_Distribution): - """ - Gaussian distribution with full covariance matrix, for continuous actions. - - :param action_dim: Dimension of the action space. - """ - - def __init__(self, action_dim: int): - super(DiagGaussianDistribution, self).__init__() - self.action_dim = action_dim - self.mean_actions = None - self.log_std = None - - def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]: - """ - Create the layers and parameter that represent the distribution: - one output will be the mean of the Gaussian, the other parameter will be the - standard deviation (log std in fact to allow negative values) - - :param latent_dim: Dimension of the last layer of the policy (before the action layer) - :param log_std_init: Initial value for the log standard deviation - :return: - """ - mean_actions = nn.Linear(latent_dim, self.action_dim) - # TODO: allow action dependent std - log_std = nn.Parameter(th.ones(self.action_dim) - * log_std_init, requires_grad=True) - return mean_actions, log_std - - def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution": - """ - Create the distribution given its parameters (mean, std) - - :param mean_actions: - :param log_std: - :return: - """ - action_std = th.ones_like(mean_actions) * log_std.exp() - self.distribution = Normal(mean_actions, action_std) - return self - - def log_prob(self, actions: th.Tensor) -> th.Tensor: - """ - Get the log probabilities of actions according to the distribution. - Note that you must first call the ``proba_distribution()`` method. - - :param actions: - :return: - """ - log_prob = self.distribution.log_prob(actions) - return sum_independent_dims(log_prob) - - def entropy(self) -> th.Tensor: - return sum_independent_dims(self.distribution.entropy()) - - def sample(self) -> th.Tensor: - # Reparametrization trick to pass gradients - return self.distribution.rsample() - - def mode(self) -> th.Tensor: - return self.distribution.mean - - def actions_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor, deterministic: bool = False) -> th.Tensor: - # Update the proba distribution - self.proba_distribution(mean_actions, log_std) - return self.get_actions(deterministic=deterministic) - - def log_prob_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor) -> Tuple[th.Tensor, th.Tensor]: - """ - Compute the log probability of taking an action - given the distribution parameters. - - :param mean_actions: - :param log_std: - :return: - """ - actions = self.actions_from_params(mean_actions, log_std) - log_prob = self.log_prob(actions) - return actions, log_prob - - -class ContextualSqrtInducedCovDiagonalGaussianDistribution(DiagGaussianDistribution): - """ - Gaussian distribution induced by its sqrt(cov), for continuous actions. - - :param action_dim: Dimension of the action space. - """ - - def __init__(self, action_dim: int): - super(DiagGaussianDistribution, self).__init__() - self.action_dim = action_dim - self.mean_actions = None - self.log_std = None - - def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]: - """ - Create the layers and parameter that represent the distribution: - one output will be the mean of the Gaussian, the other parameter will be the - standard deviation (log std in fact to allow negative values) - - :param latent_dim: Dimension of the last layer of the policy (before the action layer) - :param log_std_init: Initial value for the log standard deviation - :return: - """ - mean_actions = nn.Linear(latent_dim, self.action_dim) - log_std = nn.Linear(latent_dim, (self.action_dim, self.action_dim)) - return mean_actions, log_std - - def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution": - """ - Create the distribution given its parameters (mean, std) - - :param mean_actions: - :param log_std: - :return: - """ - action_std = th.ones_like(mean_actions) * log_std.exp() - self.distribution = Normal(mean_actions, action_std) - return self - - def log_prob(self, actions: th.Tensor) -> th.Tensor: - """ - Get the log probabilities of actions according to the distribution. - Note that you must first call the ``proba_distribution()`` method. - - :param actions: - :return: - """ - log_prob = self.distribution.log_prob(actions) - return sum_independent_dims(log_prob) - - def entropy(self) -> th.Tensor: - return sum_independent_dims(self.distribution.entropy()) - - def sample(self) -> th.Tensor: - # Reparametrization trick to pass gradients - return self.distribution.rsample() - - def mode(self) -> th.Tensor: - return self.distribution.mean - - def actions_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor, deterministic: bool = False) -> th.Tensor: - # Update the proba distribution - self.proba_distribution(mean_actions, log_std) - return self.get_actions(deterministic=deterministic) - - def log_prob_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor) -> Tuple[th.Tensor, th.Tensor]: - """ - Compute the log probability of taking an action - given the distribution parameters. - - :param mean_actions: - :param log_std: - :return: - """ - actions = self.actions_from_params(mean_actions, log_std) - log_prob = self.log_prob(actions) - return actions, log_prob - - -class DiagGaussianDistribution(SB3_Distribution): - """ - Gaussian distribution with full covariance matrix, for continuous actions. - - :param action_dim: Dimension of the action space. - """ - - def __init__(self, action_dim: int): - super(DiagGaussianDistribution, self).__init__() - self.action_dim = action_dim - self.mean_actions = None - self.log_std = None - - def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]: - """ - Create the layers and parameter that represent the distribution: - one output will be the mean of the Gaussian, the other parameter will be the - standard deviation (log std in fact to allow negative values) - - :param latent_dim: Dimension of the last layer of the policy (before the action layer) - :param log_std_init: Initial value for the log standard deviation - :return: - """ - mean_actions = nn.Linear(latent_dim, self.action_dim) - # TODO: allow action dependent std - log_std = nn.Parameter(th.ones(self.action_dim) - * log_std_init, requires_grad=True) - return mean_actions, log_std - - def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution": - """ - Create the distribution given its parameters (mean, std) - - :param mean_actions: - :param log_std: - :return: - """ - action_std = th.ones_like(mean_actions) * log_std.exp() - self.distribution = Normal(mean_actions, action_std) + action_std = None + # TODO: Needs to be expanded + if self.cov_strength == Strength.DIAG: + if self.enforce_positive_type == EnforcePositiveType.LOG: + action_std = pseudo_cov.exp() + if action_std == None: + raise Exception('Not yet implemented!') + self.distribution = Normal(mean_actions, action_std) + if self.distribution == None: + raise Exception('Not yet implemented!') return self def log_prob(self, actions: th.Tensor) -> th.Tensor: