Working on SDC-impl.
This commit is contained in:
parent
4f2e75b7ae
commit
14100cccc8
@ -37,16 +37,16 @@ class Strength(Enum):
|
||||
|
||||
|
||||
class ParametrizationType(Enum):
|
||||
CHOL = 0
|
||||
ARCHAKOVA = 1
|
||||
CHOL = 1
|
||||
ARCHAKOVA = 2
|
||||
|
||||
|
||||
class EnforcePositiveType(Enum):
|
||||
LOG = 0
|
||||
RELU = 1
|
||||
SELU = 2
|
||||
ABS = 3
|
||||
SQ = 4
|
||||
LOG = 1
|
||||
RELU = 2
|
||||
SELU = 3
|
||||
ABS = 4
|
||||
SQ = 5
|
||||
|
||||
|
||||
class UniversalGaussianDistribution(SB3_Distribution):
|
||||
@ -60,8 +60,10 @@ class UniversalGaussianDistribution(SB3_Distribution):
|
||||
super(UniversalGaussianDistribution, self).__init__()
|
||||
self.par_strength = Strength.DIAG
|
||||
self.cov_strength = Strength.DIAG
|
||||
self.par_type = None
|
||||
self.enforce_positive_type = None
|
||||
self.par_type = ParametrizationType.CHOL
|
||||
self.enforce_positive_type = EnforcePositiveType.LOG
|
||||
|
||||
self.distribution = None
|
||||
|
||||
def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]:
|
||||
"""
|
||||
@ -74,264 +76,66 @@ class UniversalGaussianDistribution(SB3_Distribution):
|
||||
:return:
|
||||
"""
|
||||
mean_actions = nn.Linear(latent_dim, self.action_dim)
|
||||
if self.contextual_cov:
|
||||
log_std = nn.Linear(latent_dim, self.action_dim)
|
||||
|
||||
if self.par_strength == Strength.NONE:
|
||||
if self.cov_strength == Strength.NONE:
|
||||
pseudo_cov = th.ones(self.action_dim) * log_std_init
|
||||
elif self.cov_strength == Strength.SCALAR:
|
||||
pseudo_cov = th.ones(self.action_dim) * \
|
||||
nn.Parameter(log_std_init, requires_grad=True)
|
||||
elif self.cov_strength == Strength.DIAG:
|
||||
pseudo_cov = nn.Parameter(
|
||||
th.ones(self.action_dim) * log_std_init, requires_grad=True)
|
||||
elif self.cov_strength == Strength.FULL:
|
||||
# Off-axis init?
|
||||
pseudo_cov = nn.Parameter(
|
||||
th.diag_embed(th.ones(self.action_dim) * log_std_init), requires_grad=True)
|
||||
elif self.par_strength == self.cov_strength:
|
||||
if self.par_strength == Strength.NONE:
|
||||
pseudo_cov = th.ones(self.action_dim)
|
||||
elif self.par_strength == Strength.SCALAR:
|
||||
std = nn.Linear(latent_dim, 1)
|
||||
pseudo_cov = th.ones(self.action_dim) * std
|
||||
elif self.par_strength == Strength.DIAG:
|
||||
pseudo_cov = nn.Linear(latent_dim, self.action_dim)
|
||||
elif self.par_strength == Strength.FULL:
|
||||
raise Exception("Don't know how to implement yet...")
|
||||
elif self.par_strength > self.cov_strength:
|
||||
raise Exception(
|
||||
'The parameterization can not be stronger than the actual covariance.')
|
||||
else:
|
||||
log_std = nn.Parameter(
|
||||
th.ones(self.action_dim) * log_std_init, requires_grad=True)
|
||||
return mean_actions, log_std
|
||||
if self.par_strength == Strength.SCALAR and self.cov_strength == Strength.DIAG:
|
||||
factor = nn.Linear(latent_dim, 1)
|
||||
par_cov = th.ones(self.action_dim) * \
|
||||
nn.Parameter(1, requires_grad=True)
|
||||
pseudo_cov = par_cov * factor[0]
|
||||
elif self.par_strength == Strength.SCALAR and self.cov_strength == Strength.FULL:
|
||||
raise Exception(
|
||||
'That does not even make any sense...')
|
||||
else:
|
||||
raise Exception(
|
||||
'Programmer-was-to-lazy-to-implement-this-Exception')
|
||||
|
||||
def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution":
|
||||
return mean_actions, pseudo_cov
|
||||
|
||||
def proba_distribution(self, mean_actions: th.Tensor, pseudo_cov: th.Tensor) -> "UniversalGaussianDistribution":
|
||||
"""
|
||||
Create the distribution given its parameters (mean, std)
|
||||
Create the distribution given its parameters (mean, pseudo_cov)
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:param pseudo_cov:
|
||||
:return:
|
||||
"""
|
||||
action_std = th.ones_like(mean_actions) * log_std.exp()
|
||||
self.distribution = Normal(mean_actions, action_std)
|
||||
return self
|
||||
|
||||
def log_prob(self, actions: th.Tensor) -> th.Tensor:
|
||||
"""
|
||||
Get the log probabilities of actions according to the distribution.
|
||||
Note that you must first call the ``proba_distribution()`` method.
|
||||
|
||||
:param actions:
|
||||
:return:
|
||||
"""
|
||||
log_prob = self.distribution.log_prob(actions)
|
||||
return sum_independent_dims(log_prob)
|
||||
|
||||
def entropy(self) -> th.Tensor:
|
||||
return sum_independent_dims(self.distribution.entropy())
|
||||
|
||||
def sample(self) -> th.Tensor:
|
||||
# Reparametrization trick to pass gradients
|
||||
return self.distribution.rsample()
|
||||
|
||||
def mode(self) -> th.Tensor:
|
||||
return self.distribution.mean
|
||||
|
||||
def actions_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor, deterministic: bool = False) -> th.Tensor:
|
||||
# Update the proba distribution
|
||||
self.proba_distribution(mean_actions, log_std)
|
||||
return self.get_actions(deterministic=deterministic)
|
||||
|
||||
def log_prob_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor) -> Tuple[th.Tensor, th.Tensor]:
|
||||
"""
|
||||
Compute the log probability of taking an action
|
||||
given the distribution parameters.
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:return:
|
||||
"""
|
||||
actions = self.actions_from_params(mean_actions, log_std)
|
||||
log_prob = self.log_prob(actions)
|
||||
return actions, log_prob
|
||||
|
||||
|
||||
class DiagGaussianDistribution(SB3_Distribution):
|
||||
"""
|
||||
Gaussian distribution with full covariance matrix, for continuous actions.
|
||||
|
||||
:param action_dim: Dimension of the action space.
|
||||
"""
|
||||
|
||||
def __init__(self, action_dim: int):
|
||||
super(DiagGaussianDistribution, self).__init__()
|
||||
self.action_dim = action_dim
|
||||
self.mean_actions = None
|
||||
self.log_std = None
|
||||
|
||||
def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]:
|
||||
"""
|
||||
Create the layers and parameter that represent the distribution:
|
||||
one output will be the mean of the Gaussian, the other parameter will be the
|
||||
standard deviation (log std in fact to allow negative values)
|
||||
|
||||
:param latent_dim: Dimension of the last layer of the policy (before the action layer)
|
||||
:param log_std_init: Initial value for the log standard deviation
|
||||
:return:
|
||||
"""
|
||||
mean_actions = nn.Linear(latent_dim, self.action_dim)
|
||||
# TODO: allow action dependent std
|
||||
log_std = nn.Parameter(th.ones(self.action_dim)
|
||||
* log_std_init, requires_grad=True)
|
||||
return mean_actions, log_std
|
||||
|
||||
def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution":
|
||||
"""
|
||||
Create the distribution given its parameters (mean, std)
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:return:
|
||||
"""
|
||||
action_std = th.ones_like(mean_actions) * log_std.exp()
|
||||
self.distribution = Normal(mean_actions, action_std)
|
||||
return self
|
||||
|
||||
def log_prob(self, actions: th.Tensor) -> th.Tensor:
|
||||
"""
|
||||
Get the log probabilities of actions according to the distribution.
|
||||
Note that you must first call the ``proba_distribution()`` method.
|
||||
|
||||
:param actions:
|
||||
:return:
|
||||
"""
|
||||
log_prob = self.distribution.log_prob(actions)
|
||||
return sum_independent_dims(log_prob)
|
||||
|
||||
def entropy(self) -> th.Tensor:
|
||||
return sum_independent_dims(self.distribution.entropy())
|
||||
|
||||
def sample(self) -> th.Tensor:
|
||||
# Reparametrization trick to pass gradients
|
||||
return self.distribution.rsample()
|
||||
|
||||
def mode(self) -> th.Tensor:
|
||||
return self.distribution.mean
|
||||
|
||||
def actions_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor, deterministic: bool = False) -> th.Tensor:
|
||||
# Update the proba distribution
|
||||
self.proba_distribution(mean_actions, log_std)
|
||||
return self.get_actions(deterministic=deterministic)
|
||||
|
||||
def log_prob_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor) -> Tuple[th.Tensor, th.Tensor]:
|
||||
"""
|
||||
Compute the log probability of taking an action
|
||||
given the distribution parameters.
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:return:
|
||||
"""
|
||||
actions = self.actions_from_params(mean_actions, log_std)
|
||||
log_prob = self.log_prob(actions)
|
||||
return actions, log_prob
|
||||
|
||||
|
||||
class ContextualSqrtInducedCovDiagonalGaussianDistribution(DiagGaussianDistribution):
|
||||
"""
|
||||
Gaussian distribution induced by its sqrt(cov), for continuous actions.
|
||||
|
||||
:param action_dim: Dimension of the action space.
|
||||
"""
|
||||
|
||||
def __init__(self, action_dim: int):
|
||||
super(DiagGaussianDistribution, self).__init__()
|
||||
self.action_dim = action_dim
|
||||
self.mean_actions = None
|
||||
self.log_std = None
|
||||
|
||||
def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]:
|
||||
"""
|
||||
Create the layers and parameter that represent the distribution:
|
||||
one output will be the mean of the Gaussian, the other parameter will be the
|
||||
standard deviation (log std in fact to allow negative values)
|
||||
|
||||
:param latent_dim: Dimension of the last layer of the policy (before the action layer)
|
||||
:param log_std_init: Initial value for the log standard deviation
|
||||
:return:
|
||||
"""
|
||||
mean_actions = nn.Linear(latent_dim, self.action_dim)
|
||||
log_std = nn.Linear(latent_dim, (self.action_dim, self.action_dim))
|
||||
return mean_actions, log_std
|
||||
|
||||
def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution":
|
||||
"""
|
||||
Create the distribution given its parameters (mean, std)
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:return:
|
||||
"""
|
||||
action_std = th.ones_like(mean_actions) * log_std.exp()
|
||||
self.distribution = Normal(mean_actions, action_std)
|
||||
return self
|
||||
|
||||
def log_prob(self, actions: th.Tensor) -> th.Tensor:
|
||||
"""
|
||||
Get the log probabilities of actions according to the distribution.
|
||||
Note that you must first call the ``proba_distribution()`` method.
|
||||
|
||||
:param actions:
|
||||
:return:
|
||||
"""
|
||||
log_prob = self.distribution.log_prob(actions)
|
||||
return sum_independent_dims(log_prob)
|
||||
|
||||
def entropy(self) -> th.Tensor:
|
||||
return sum_independent_dims(self.distribution.entropy())
|
||||
|
||||
def sample(self) -> th.Tensor:
|
||||
# Reparametrization trick to pass gradients
|
||||
return self.distribution.rsample()
|
||||
|
||||
def mode(self) -> th.Tensor:
|
||||
return self.distribution.mean
|
||||
|
||||
def actions_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor, deterministic: bool = False) -> th.Tensor:
|
||||
# Update the proba distribution
|
||||
self.proba_distribution(mean_actions, log_std)
|
||||
return self.get_actions(deterministic=deterministic)
|
||||
|
||||
def log_prob_from_params(self, mean_actions: th.Tensor, log_std: th.Tensor) -> Tuple[th.Tensor, th.Tensor]:
|
||||
"""
|
||||
Compute the log probability of taking an action
|
||||
given the distribution parameters.
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:return:
|
||||
"""
|
||||
actions = self.actions_from_params(mean_actions, log_std)
|
||||
log_prob = self.log_prob(actions)
|
||||
return actions, log_prob
|
||||
|
||||
|
||||
class DiagGaussianDistribution(SB3_Distribution):
|
||||
"""
|
||||
Gaussian distribution with full covariance matrix, for continuous actions.
|
||||
|
||||
:param action_dim: Dimension of the action space.
|
||||
"""
|
||||
|
||||
def __init__(self, action_dim: int):
|
||||
super(DiagGaussianDistribution, self).__init__()
|
||||
self.action_dim = action_dim
|
||||
self.mean_actions = None
|
||||
self.log_std = None
|
||||
|
||||
def proba_distribution_net(self, latent_dim: int, log_std_init: float = 0.0) -> Tuple[nn.Module, nn.Parameter]:
|
||||
"""
|
||||
Create the layers and parameter that represent the distribution:
|
||||
one output will be the mean of the Gaussian, the other parameter will be the
|
||||
standard deviation (log std in fact to allow negative values)
|
||||
|
||||
:param latent_dim: Dimension of the last layer of the policy (before the action layer)
|
||||
:param log_std_init: Initial value for the log standard deviation
|
||||
:return:
|
||||
"""
|
||||
mean_actions = nn.Linear(latent_dim, self.action_dim)
|
||||
# TODO: allow action dependent std
|
||||
log_std = nn.Parameter(th.ones(self.action_dim)
|
||||
* log_std_init, requires_grad=True)
|
||||
return mean_actions, log_std
|
||||
|
||||
def proba_distribution(self, mean_actions: th.Tensor, log_std: th.Tensor) -> "DiagGaussianDistribution":
|
||||
"""
|
||||
Create the distribution given its parameters (mean, std)
|
||||
|
||||
:param mean_actions:
|
||||
:param log_std:
|
||||
:return:
|
||||
"""
|
||||
action_std = th.ones_like(mean_actions) * log_std.exp()
|
||||
self.distribution = Normal(mean_actions, action_std)
|
||||
action_std = None
|
||||
# TODO: Needs to be expanded
|
||||
if self.cov_strength == Strength.DIAG:
|
||||
if self.enforce_positive_type == EnforcePositiveType.LOG:
|
||||
action_std = pseudo_cov.exp()
|
||||
if action_std == None:
|
||||
raise Exception('Not yet implemented!')
|
||||
self.distribution = Normal(mean_actions, action_std)
|
||||
if self.distribution == None:
|
||||
raise Exception('Not yet implemented!')
|
||||
return self
|
||||
|
||||
def log_prob(self, actions: th.Tensor) -> th.Tensor:
|
||||
|
Loading…
Reference in New Issue
Block a user