working bp version

This commit is contained in:
Onur 2022-05-03 19:51:54 +02:00
parent f33996c27a
commit 2fbde9fbb1
21 changed files with 460 additions and 224 deletions

View File

@ -391,7 +391,8 @@ register(
max_episode_steps=600,
kwargs={
"rndm_goal": False,
"cup_goal_pos": [-0.3, -1.2]
"cup_goal_pos": [0.1, -2.0],
"learn_release_step": True
}
)
@ -403,7 +404,8 @@ register(
max_episode_steps=600,
kwargs={
"rndm_goal": True,
"cup_goal_pos": [-0.3, -1.2]
"cup_goal_pos": [-0.3, -1.2],
"learn_release_step": True
}
)
@ -546,32 +548,70 @@ for _v in _versions:
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
# ## Beerpong
# _versions = ["v0", "v1"]
# for _v in _versions:
# _env_id = f'BeerpongProMP-{_v}'
# register(
# id=_env_id,
# entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
# kwargs={
# "name": f"alr_envs:ALRBeerPong-{_v}",
# "wrappers": [mujoco.beerpong.MPWrapper],
# "mp_kwargs": {
# "num_dof": 7,
# "num_basis": 2,
# # "duration": 1,
# "duration": 0.5,
# # "post_traj_time": 2,
# "post_traj_time": 2.5,
# "policy_type": "motor",
# "weights_scale": 0.14,
# # "weights_scale": 1,
# "zero_start": True,
# "zero_goal": False,
# "policy_kwargs": {
# "p_gains": np.array([ 1.5, 5, 2.55, 3, 2., 2, 1.25]),
# "d_gains": np.array([0.02333333, 0.1, 0.0625, 0.08, 0.03, 0.03, 0.0125])
# }
# }
# }
# )
# ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
## Beerpong
_versions = ["v0", "v1"]
for _v in _versions:
_env_id = f'BeerpongProMP-{_v}'
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
entry_point='alr_envs.utils.make_env_helpers:make_mp_env_helper',
kwargs={
"name": f"alr_envs:ALRBeerPong-{_v}",
"wrappers": [mujoco.beerpong.MPWrapper],
"mp_kwargs": {
"num_dof": 7,
"num_basis": 2,
# "duration": 1,
"duration": 0.5,
# "post_traj_time": 2,
"post_traj_time": 2.5,
"policy_type": "motor",
"weights_scale": 0.14,
# "weights_scale": 1,
"zero_start": True,
"zero_goal": False,
"policy_kwargs": {
"wrappers": [mujoco.beerpong.NewMPWrapper],
"ep_wrapper_kwargs": {
"weight_scale": 1
},
"movement_primitives_kwargs": {
'movement_primitives_type': 'promp',
'num_dof': 7
},
"phase_generator_kwargs": {
'phase_generator_type': 'linear',
'delay': 0,
'tau': 0.8, # initial value
'learn_tau': True,
'learn_delay': False
},
"controller_kwargs": {
'controller_type': 'motor',
"p_gains": np.array([1.5, 5, 2.55, 3, 2., 2, 1.25]),
"d_gains": np.array([0.02333333, 0.1, 0.0625, 0.08, 0.03, 0.03, 0.0125])
}
"d_gains": np.array([0.02333333, 0.1, 0.0625, 0.08, 0.03, 0.03, 0.0125]),
},
"basis_generator_kwargs": {
'basis_generator_type': 'zero_rbf',
'num_basis': 2,
'num_basis_zero_start': 2
}
}
)

View File

@ -1 +1,2 @@
from .mp_wrapper import MPWrapper
from .new_mp_wrapper import NewMPWrapper

View File

@ -3,6 +3,7 @@ import os
import numpy as np
from gym import utils
from gym import spaces
from gym.envs.mujoco import MujocoEnv
from alr_envs.alr.mujoco.beerpong.beerpong_reward_staged import BeerPongReward
@ -17,7 +18,7 @@ CUP_POS_MAX = np.array([0.32, -1.2])
class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
def __init__(self, frame_skip=1, apply_gravity_comp=True, noisy=False,
rndm_goal=False, cup_goal_pos=None):
rndm_goal=False, learn_release_step=True, cup_goal_pos=None):
cup_goal_pos = np.array(cup_goal_pos if cup_goal_pos is not None else [-0.3, -1.2, 0.840])
if cup_goal_pos.shape[0]==2:
cup_goal_pos = np.insert(cup_goal_pos, 2, 0.840)
@ -51,10 +52,9 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
self.noise_std = 0.01
else:
self.noise_std = 0
self.learn_release_step = learn_release_step
reward_function = BeerPongReward
self.reward_function = reward_function()
self.n_table_bounces_first = 0
MujocoEnv.__init__(self, self.xml_path, frame_skip)
utils.EzPickle.__init__(self)
@ -63,6 +63,13 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
def start_pos(self):
return self._start_pos
def _set_action_space(self):
bounds = self.model.actuator_ctrlrange.copy().astype(np.float32)
bounds = np.concatenate((bounds, [[50, self.ep_length*0.333]]), axis=0)
low, high = bounds.T
self.action_space = spaces.Box(low=low, high=high, dtype=np.float32)
return self.action_space
@property
def start_vel(self):
return self._start_vel
@ -76,8 +83,6 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
return self.sim.data.qvel[0:7].copy()
def reset(self):
print(not self.reward_function.ball_ground_contact_first)
self.n_table_bounces_first += int(not self.reward_function.ball_ground_contact_first)
self.reward_function.reset(self.add_noise)
return super().reset()
@ -104,14 +109,17 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
return self._get_obs()
def step(self, a):
self._release_step = a[-1] if self.learn_release_step else self._release_step
self._release_step = np.clip(self._release_step, self.action_space.low[-1], self.action_space.high[-1]) \
if self.learn_release_step else self._release_step
reward_dist = 0.0
angular_vel = 0.0
reward_ctrl = - np.square(a).sum()
applied_action = a[:a.shape[0]-int(self.learn_release_step)]
reward_ctrl = - np.square(applied_action).sum()
if self.apply_gravity_comp:
a = a + self.sim.data.qfrc_bias[:len(a)].copy() / self.model.actuator_gear[:, 0]
applied_action += self.sim.data.qfrc_bias[:len(applied_action)].copy() / self.model.actuator_gear[:, 0]
try:
self.do_simulation(a, self.frame_skip)
self.do_simulation(applied_action, self.frame_skip)
if self._steps < self._release_step:
self.sim.data.qpos[7::] = self.sim.data.site_xpos[self.ball_site_id, :].copy()
self.sim.data.qvel[7::] = self.sim.data.site_xvelp[self.ball_site_id, :].copy()
@ -125,7 +133,7 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
ob = self._get_obs()
if not crash:
reward, reward_infos = self.reward_function.compute_reward(self, a)
reward, reward_infos = self.reward_function.compute_reward(self, applied_action)
success = reward_infos['success']
is_collided = reward_infos['is_collided']
ball_pos = reward_infos['ball_pos']

View File

@ -162,6 +162,10 @@ class BeerPongReward:
min_dist_coeff, final_dist_coeff, rew_offset = 0, 1, 0
reward = rew_offset - min_dist_coeff * min_dist ** 2 - final_dist_coeff * final_dist ** 2 - \
1e-4 * np.mean(action_cost)
if env.learn_release_step and not self.ball_in_cup:
too_small = (env._release_step<50)*(env._release_step-50)**2
too_big = (env._release_step>200)*0.2*(env._release_step-200)**2
reward = reward - too_small -too_big
# 1e-7*np.mean(action_cost)
success = self.ball_in_cup
else:

View File

@ -1,13 +1,15 @@
from mp_wrapper import BaseMPWrapper
from alr_envs.mp.episodic_wrapper import EpisodicWrapper
from typing import Union, Tuple
import numpy as np
import gym
class MPWrapper(BaseMPWrapper):
class NewMPWrapper(EpisodicWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qpos[0:7].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[0:7].copy()
@ -18,3 +20,25 @@ class MPWrapper(BaseMPWrapper):
[True] * 2, # xy position of cup
[False] # env steps
])
def _step_callback(self, t: int, env_spec_params: Union[np.ndarray, None], step_action: np.ndarray) -> Union[np.ndarray]:
if self.env.learn_release_step:
return np.concatenate((step_action, np.atleast_1d(env_spec_params)))
else:
return step_action
def _episode_callback(self, action: np.ndarray) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
if self.env.learn_release_step:
return action[:-1], action[-1] # mp_params, release step
else:
return action, None
def set_action_space(self):
if self.env.learn_release_step:
min_action_bounds, max_action_bounds = self.mp.get_param_bounds()
min_action_bounds = np.concatenate((min_action_bounds.numpy(), [self.env.action_space.low[-1]]))
max_action_bounds = np.concatenate((max_action_bounds.numpy(), [self.env.action_space.high[-1]]))
self.mp_action_space = gym.spaces.Box(low=min_action_bounds, high=max_action_bounds, dtype=np.float32)
return self.mp_action_space
else:
return super(NewMPWrapper, self).set_action_space()

View File

@ -1,9 +1,9 @@
from mp_wrapper import BaseMPWrapper
from alr_envs.mp.episodic_wrapper import EpisodicWrapper
from typing import Union, Tuple
import numpy as np
class MPWrapper(BaseMPWrapper):
class MPWrapper(EpisodicWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:

View File

@ -39,7 +39,7 @@ actual_vel = np.zeros((len(pos), *base_shape))
act = np.zeros((len(pos), *base_shape))
for t, pos_vel in enumerate(zip(pos, vel)):
actions = env.policy.get_action(pos_vel[0], pos_vel[1])
actions = env.policy.get_action(pos_vel[0], pos_vel[1],, self.current_vel, self.current_pos
actions = np.clip(actions, env.full_action_space.low, env.full_action_space.high)
_, _, _, _ = env.env.step(actions)
act[t, :] = actions

0
alr_envs/mp/__init__.py Normal file
View File

View File

@ -0,0 +1,17 @@
from mp_pytorch import PhaseGenerator, NormalizedRBFBasisGenerator, ZeroStartNormalizedRBFBasisGenerator
from mp_pytorch.basis_gn.rhytmic_basis import RhythmicBasisGenerator
ALL_TYPES = ["rbf", "zero_rbf", "rhythmic"]
def get_basis_generator(basis_generator_type: str, phase_generator: PhaseGenerator, **kwargs):
basis_generator_type = basis_generator_type.lower()
if basis_generator_type == "rbf":
return NormalizedRBFBasisGenerator(phase_generator, **kwargs)
elif basis_generator_type == "zero_rbf":
return ZeroStartNormalizedRBFBasisGenerator(phase_generator, **kwargs)
elif basis_generator_type == "rhythmic":
return RhythmicBasisGenerator(phase_generator, **kwargs)
else:
raise ValueError(f"Specified basis generator type {basis_generator_type} not supported, "
f"please choose one of {ALL_TYPES}.")

View File

View File

@ -0,0 +1,4 @@
class BaseController:
def get_action(self, des_pos, des_vel, c_pos, c_vel):
raise NotImplementedError

View File

@ -0,0 +1,21 @@
from alr_envs.mp.controllers.meta_world_controller import MetaWorldController
from alr_envs.mp.controllers.pd_controller import PDController
from alr_envs.mp.controllers.vel_controller import VelController
from alr_envs.mp.controllers.pos_controller import PosController
ALL_TYPES = ["motor", "velocity", "position", "metaworld"]
def get_controller(controller_type: str, **kwargs):
controller_type = controller_type.lower()
if controller_type == "motor":
return PDController(**kwargs)
elif controller_type == "velocity":
return VelController()
elif controller_type == "position":
return PosController()
elif controller_type == "metaworld":
return MetaWorldController()
else:
raise ValueError(f"Specified controller type {controller_type} not supported, "
f"please choose one of {ALL_TYPES}.")

View File

@ -0,0 +1,25 @@
import numpy as np
from alr_envs.mp.controllers.base_controller import BaseController
class MetaWorldController(BaseController):
"""
A Metaworld Controller. Using position and velocity information from a provided environment,
the controller calculates a response based on the desired position and velocity.
Unlike the other Controllers, this is a special controller for MetaWorld environments.
They use a position delta for the xyz coordinates and a raw position for the gripper opening.
:param env: A position environment
"""
def get_action(self, des_pos, des_vel, c_pos, c_vel):
gripper_pos = des_pos[-1]
cur_pos = env.current_pos[:-1]
xyz_pos = des_pos[:-1]
assert xyz_pos.shape == cur_pos.shape, \
f"Mismatch in dimension between desired position {xyz_pos.shape} and current position {cur_pos.shape}"
trq = np.hstack([(xyz_pos - cur_pos), gripper_pos])
return trq

View File

@ -0,0 +1,28 @@
from typing import Union, Tuple
from alr_envs.mp.controllers.base_controller import BaseController
class PDController(BaseController):
"""
A PD-Controller. Using position and velocity information from a provided environment,
the controller calculates a response based on the desired position and velocity
:param env: A position environment
:param p_gains: Factors for the proportional gains
:param d_gains: Factors for the differential gains
"""
def __init__(self,
p_gains: Union[float, Tuple] = 1,
d_gains: Union[float, Tuple] = 0.5):
self.p_gains = p_gains
self.d_gains = d_gains
def get_action(self, des_pos, des_vel, c_pos, c_vel):
assert des_pos.shape == c_pos.shape, \
f"Mismatch in dimension between desired position {des_pos.shape} and current position {c_pos.shape}"
assert des_vel.shape == c_vel.shape, \
f"Mismatch in dimension between desired velocity {des_vel.shape} and current velocity {c_vel.shape}"
trq = self.p_gains * (des_pos - c_pos) + self.d_gains * (des_vel - c_vel)
return trq

View File

@ -0,0 +1,9 @@
from alr_envs.mp.controllers.base_controller import BaseController
class PosController(BaseController):
"""
A Position Controller. The controller calculates a response only based on the desired position.
"""
def get_action(self, des_pos, des_vel, c_pos, c_vel):
return des_pos

View File

@ -0,0 +1,9 @@
from alr_envs.mp.controllers.base_controller import BaseController
class VelController(BaseController):
"""
A Velocity Controller. The controller calculates a response only based on the desired velocity.
"""
def get_action(self, des_pos, des_vel, c_pos, c_vel):
return des_vel

View File

@ -3,15 +3,13 @@ from typing import Union, Tuple
import gym
import numpy as np
from gym import spaces
from gym.envs.mujoco import MujocoEnv
from policies import get_policy_class, BaseController
from mp_pytorch.mp.mp_interfaces import MPInterface
from alr_envs.mp.controllers.base_controller import BaseController
class BaseMPWrapper(gym.Env, ABC):
class EpisodicWrapper(gym.Env, ABC):
"""
Base class for movement primitive based gym.Wrapper implementations.
@ -20,41 +18,34 @@ class BaseMPWrapper(gym.Env, ABC):
num_dof: Dimension of the action space of the wrapped env
num_basis: Number of basis functions per dof
duration: Length of the trajectory of the movement primitive in seconds
post_traj_time: Time for which the last position of the trajectory is fed to the environment to continue
simulation in seconds
policy_type: Type or object defining the policy that is used to generate action based on the trajectory
controller: Type or object defining the policy that is used to generate action based on the trajectory
weight_scale: Scaling parameter for the actions given to this wrapper
render_mode: Equivalent to gym render mode
"""
def __init__(self,
env: MujocoEnv,
env: gym.Env,
mp: MPInterface,
controller: BaseController,
duration: float,
policy_type: Union[str, BaseController] = None,
render_mode: str = None,
verbose=2,
**mp_kwargs
):
verbose: int = 1,
weight_scale: float = 1):
super().__init__()
assert env.dt is not None
self.env = env
try:
self.dt = env.dt
except AttributeError:
raise AttributeError("step based environment needs to have a function 'dt' ")
self.duration = duration
self.traj_steps = int(duration / self.dt)
self.post_traj_steps = self.env.spec.max_episode_steps - self.traj_steps
# TODO: move to constructor, use policy factory instead what Fabian already coded
if isinstance(policy_type, str):
# pop policy kwargs here such that they are not passed to the initialize_mp method
self.policy = get_policy_class(policy_type, self, **mp_kwargs.pop('policy_kwargs', {}))
else:
self.policy = policy_type
self.controller = controller
self.mp = mp
self.env = env
self.verbose = verbose
self.weight_scale = weight_scale
# rendering
self.render_mode = render_mode
@ -62,19 +53,24 @@ class BaseMPWrapper(gym.Env, ABC):
# self.time_steps = np.linspace(0, self.duration, self.traj_steps + 1)
self.time_steps = np.linspace(0, self.duration, self.traj_steps)
self.mp.set_mp_times(self.time_steps)
# action_bounds = np.inf * np.ones((np.prod(self.mp.num_params)))
min_action_bounds, max_action_bounds = mp.get_param_bounds()
self.action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(),
self.mp_action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(),
dtype=np.float32)
self.action_space = self.set_action_space()
self.active_obs = self.set_active_obs()
self.observation_space = spaces.Box(low=self.env.observation_space.low[self.active_obs],
high=self.env.observation_space.high[self.active_obs],
dtype=self.env.observation_space.dtype)
def get_trajectory(self, action: np.ndarray) -> Tuple:
self.mp.set_params(action)
# TODO: this follows the implementation of the mp_pytorch library which includes the paramters tau and delay at
# the beginning of the array.
ignore_indices = int(self.mp.learn_tau) + int(self.mp.learn_delay)
scaled_mp_params = action.copy()
scaled_mp_params[ignore_indices:] *= self.weight_scale
self.mp.set_params(scaled_mp_params)
self.mp.set_boundary_conditions(bc_time=self.time_steps[:1], bc_pos=self.current_pos, bc_vel=self.current_vel)
traj_dict = self.mp.get_mp_trajs(get_pos = True, get_vel = True)
trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel']
@ -88,9 +84,54 @@ class BaseMPWrapper(gym.Env, ABC):
return trajectory, velocity
def set_action_space(self):
"""
This function can be used to modify the action space for considering actions which are not learned via motion
primitives. E.g. ball releasing time for the beer pong task. By default, it is the parameter space of the
motion primitive.
Only needs to be overwritten if the action space needs to be modified.
"""
return self.mp_action_space
def _episode_callback(self, action: np.ndarray) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
"""
Used to extract the parameters for the motion primitive and other parameters from an action array which might
include other actions like ball releasing time for the beer pong environment.
This only needs to be overwritten if the action space is modified.
Args:
action: a vector instance of the whole action space, includes mp parameters and additional parameters if
specified, else only mp parameters
Returns:
Tuple: mp_arguments and other arguments
"""
return action, None
def _step_callback(self, t: int, env_spec_params: Union[np.ndarray, None], step_action: np.ndarray) -> Union[np.ndarray]:
"""
This function can be used to modify the step_action with additional parameters e.g. releasing the ball in the
Beerpong env. The parameters used should not be part of the motion primitive parameters.
Returns step_action by default, can be overwritten in individual mp_wrappers.
Args:
t: the current time step of the episode
env_spec_params: the environment specific parameter, as defined in fucntion _episode_callback
(e.g. ball release time in Beer Pong)
step_action: the current step-based action
Returns:
modified step action
"""
return step_action
@abstractmethod
def set_active_obs(self):
pass
def set_active_obs(self) -> np.ndarray:
"""
This function defines the contexts. The contexts are defined as specific observations.
Returns:
boolearn array representing the indices of the observations
"""
return np.ones(self.env.observation_space.shape[0], dtype=bool)
@property
@abstractmethod
@ -116,22 +157,17 @@ class BaseMPWrapper(gym.Env, ABC):
"""
raise NotImplementedError()
def _step_callback(self, t, action):
pass
def step(self, action: np.ndarray):
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
# TODO: Think about sequencing
# TODO: Reward Function rather here?
# agent to learn when to release the ball
trajectory, velocity = self.get_trajectory(action)
mp_params, env_spec_params = self._episode_callback(action)
trajectory, velocity = self.get_trajectory(mp_params)
trajectory_length = len(trajectory)
if self.verbose >=2 :
actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape)
if isinstance(self.env.observation_space, spaces.Dict): # For goal environments
observations = np.zeros(shape=(trajectory_length,) + self.env.observation_space["observation"].shape,
dtype=self.env.observation_space.dtype)
else:
observations = np.zeros(shape=(trajectory_length,) + self.env.observation_space.shape,
dtype=self.env.observation_space.dtype)
rewards = np.zeros(shape=(trajectory_length,))
@ -140,14 +176,15 @@ class BaseMPWrapper(gym.Env, ABC):
infos = dict()
for t, pos_vel in enumerate(zip(trajectory, velocity)):
ac = self.policy.get_action(pos_vel[0], pos_vel[1])
callback_action = self._step_callback(t, action)
if callback_action is not None:
ac = np.concatenate((callback_action, ac)) # include callback action at first pos of vector
actions[t, :] = np.clip(ac, self.env.action_space.low, self.env.action_space.high)
obs, rewards[t], done, info = self.env.step(actions[t, :])
observations[t, :] = obs["observation"] if isinstance(self.env.observation_space, spaces.Dict) else obs
trajectory_return += rewards[t]
step_action = self.controller.get_action(pos_vel[0], pos_vel[1], self.current_pos, self.current_vel)
step_action = self._step_callback(t, env_spec_params, step_action) # include possible callback info
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
obs, c_reward, done, info = self.env.step(c_action)
if self.verbose >= 2:
actions[t, :] = c_action
rewards[t] = c_reward
observations[t, :] = obs
trajectory_return += c_reward
for k, v in info.items():
elems = infos.get(k, [None] * trajectory_length)
elems[t] = v
@ -158,14 +195,14 @@ class BaseMPWrapper(gym.Env, ABC):
if done:
break
infos.update({k: v[:t + 1] for k, v in infos.items()})
if self.verbose >= 2:
infos['trajectory'] = trajectory
if self.verbose == 2:
infos['step_actions'] = actions[:t + 1]
infos['step_observations'] = observations[:t + 1]
infos['step_rewards'] = rewards[:t + 1]
infos['trajectory_length'] = t + 1
done = True
return self.get_observation_from_step(observations[t]), trajectory_return, done, infos
return self.get_observation_from_step(obs), trajectory_return, done, infos
def reset(self):
return self.get_observation_from_step(self.env.reset())

22
alr_envs/mp/mp_factory.py Normal file
View File

@ -0,0 +1,22 @@
from mp_pytorch.mp.dmp import DMP
from mp_pytorch.mp.promp import ProMP
from mp_pytorch.mp.idmp import IDMP
from mp_pytorch.basis_gn.basis_generator import BasisGenerator
ALL_TYPES = ["promp", "dmp", "idmp"]
def get_movement_primitive(
movement_primitives_type: str, action_dim: int, basis_generator: BasisGenerator, **kwargs
):
movement_primitives_type = movement_primitives_type.lower()
if movement_primitives_type == "promp":
return ProMP(basis_generator, action_dim, **kwargs)
elif movement_primitives_type == "dmp":
return DMP(basis_generator, action_dim, **kwargs)
elif movement_primitives_type == 'idmp':
return IDMP(basis_generator, action_dim, **kwargs)
else:
raise ValueError(f"Specified movement primitive type {movement_primitives_type} not supported, "
f"please choose one of {ALL_TYPES}.")

View File

@ -0,0 +1,20 @@
from mp_pytorch import LinearPhaseGenerator, ExpDecayPhaseGenerator
from mp_pytorch.phase_gn.rhythmic_phase_generator import RhythmicPhaseGenerator
from mp_pytorch.phase_gn.smooth_phase_generator import SmoothPhaseGenerator
ALL_TYPES = ["linear", "exp", "rhythmic", "smooth"]
def get_phase_generator(phase_generator_type, **kwargs):
phase_generator_type = phase_generator_type.lower()
if phase_generator_type == "linear":
return LinearPhaseGenerator(**kwargs)
elif phase_generator_type == "exp":
return ExpDecayPhaseGenerator(**kwargs)
elif phase_generator_type == "rhythmic":
return RhythmicPhaseGenerator(**kwargs)
elif phase_generator_type == "smooth":
return SmoothPhaseGenerator(**kwargs)
else:
raise ValueError(f"Specified phase generator type {phase_generator_type} not supported, "
f"please choose one of {ALL_TYPES}.")

View File

@ -1,13 +1,20 @@
import warnings
from typing import Iterable, Type, Union
from typing import Iterable, Type, Union, Mapping, MutableMapping
import gym
import numpy as np
from gym.envs.registration import EnvSpec
from mp_env_api import MPEnvWrapper
from mp_env_api.mp_wrappers.dmp_wrapper import DmpWrapper
from mp_env_api.mp_wrappers.promp_wrapper import ProMPWrapper
from mp_pytorch import MPInterface
from alr_envs.mp.basis_generator_factory import get_basis_generator
from alr_envs.mp.controllers.base_controller import BaseController
from alr_envs.mp.controllers.controller_factory import get_controller
from alr_envs.mp.mp_factory import get_movement_primitive
from alr_envs.mp.episodic_wrapper import EpisodicWrapper
from alr_envs.mp.phase_generator_factory import get_phase_generator
def make_rank(env_id: str, seed: int, rank: int = 0, return_callable=True, **kwargs):
@ -92,7 +99,8 @@ def make(env_id: str, seed, **kwargs):
return env
def _make_wrapped_env(env_id: str, wrappers: Iterable[Type[gym.Wrapper]], seed=1, **kwargs):
def _make_wrapped_env(env_id: str, wrappers: Iterable[Type[gym.Wrapper]], mp: MPInterface, controller: BaseController,
ep_wrapper_kwargs: Mapping, seed=1, **kwargs):
"""
Helper function for creating a wrapped gym environment using MPs.
It adds all provided wrappers to the specified environment and verifies at least one MPEnvWrapper is
@ -108,14 +116,102 @@ def _make_wrapped_env(env_id: str, wrappers: Iterable[Type[gym.Wrapper]], seed=1
"""
# _env = gym.make(env_id)
_env = make(env_id, seed, **kwargs)
assert any(issubclass(w, MPEnvWrapper) for w in wrappers), \
"At least one MPEnvWrapper is required in order to leverage motion primitive environments."
has_episodic_wrapper = False
for w in wrappers:
# only wrap the environment if not EpisodicWrapper, e.g. for vision
if not issubclass(w, EpisodicWrapper):
_env = w(_env)
else: # if EpisodicWrapper, use specific constructor
has_episodic_wrapper = True
_env = w(env=_env, mp=mp, controller=controller, **ep_wrapper_kwargs)
assert has_episodic_wrapper, \
"At least one MPEnvWrapper is required in order to leverage motion primitive environments."
return _env
def make_mp_from_kwargs(
env_id: str, wrappers: Iterable, ep_wrapper_kwargs: MutableMapping, mp_kwargs: MutableMapping,
controller_kwargs: MutableMapping, phase_kwargs: MutableMapping, basis_kwargs: MutableMapping, seed=1,
sequenced=False, **kwargs
):
"""
This can also be used standalone for manually building a custom DMP environment.
Args:
ep_wrapper_kwargs:
basis_kwargs:
phase_kwargs:
controller_kwargs:
env_id: base_env_name,
wrappers: list of wrappers (at least an EpisodicWrapper),
seed: seed of environment
sequenced: When true, this allows to sequence multiple ProMPs by specifying the duration of each sub-trajectory,
this behavior is much closer to step based learning.
mp_kwargs: dict of at least {num_dof: int, num_basis: int} for DMP
Returns: DMP wrapped gym env
"""
_verify_time_limit(mp_kwargs.get("duration", None), kwargs.get("time_limit", None))
dummy_env = make(env_id, seed)
if ep_wrapper_kwargs.get('duration', None) is None:
ep_wrapper_kwargs['duration'] = dummy_env.spec.max_episode_steps*dummy_env.dt
if phase_kwargs.get('tau', None) is None:
phase_kwargs['tau'] = ep_wrapper_kwargs['duration']
action_dim = mp_kwargs.pop('num_dof', None)
action_dim = action_dim if action_dim is not None else np.prod(dummy_env.action_space.shape).item()
phase_gen = get_phase_generator(**phase_kwargs)
basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs)
controller = get_controller(**controller_kwargs)
mp = get_movement_primitive(action_dim=action_dim, basis_generator=basis_gen, **mp_kwargs)
_env = _make_wrapped_env(env_id=env_id, wrappers=wrappers, mp=mp, controller=controller,
ep_wrapper_kwargs=ep_wrapper_kwargs, seed=seed, **kwargs)
return _env
def make_mp_env_helper(**kwargs):
"""
Helper function for registering a DMP gym environments.
Args:
**kwargs: expects at least the following:
{
"name": base environment name.
"wrappers": list of wrappers (at least an EpisodicWrapper is required),
"movement_primitives_kwargs": {
"movement_primitives_type": type_of_your_movement_primitive,
non default arguments for the movement primitive instance
...
}
"controller_kwargs": {
"controller_type": type_of_your_controller,
non default arguments for the controller instance
...
},
"basis_generator_kwargs": {
"basis_generator_type": type_of_your_basis_generator,
non default arguments for the basis generator instance
...
},
"phase_generator_kwargs": {
"phase_generator_type": type_of_your_phase_generator,
non default arguments for the phase generator instance
...
},
}
Returns: MP wrapped gym env
"""
seed = kwargs.pop("seed", None)
wrappers = kwargs.pop("wrappers")
mp_kwargs = kwargs.pop("movement_primitives_kwargs")
ep_wrapper_kwargs = kwargs.pop('ep_wrapper_kwargs')
contr_kwargs = kwargs.pop("controller_kwargs")
phase_kwargs = kwargs.pop("phase_generator_kwargs")
basis_kwargs = kwargs.pop("basis_generator_kwargs")
return make_mp_from_kwargs(env_id=kwargs.pop("name"), wrappers=wrappers, ep_wrapper_kwargs=ep_wrapper_kwargs,
mp_kwargs=mp_kwargs, controller_kwargs=contr_kwargs, phase_kwargs=phase_kwargs,
basis_kwargs=basis_kwargs, **kwargs, seed=seed)
def make_dmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs):
"""

View File

@ -1,129 +0,0 @@
from typing import Tuple, Union
import numpy as np
class BaseController:
def __init__(self, env, **kwargs):
self.env = env
def get_action(self, des_pos, des_vel):
raise NotImplementedError
class PosController(BaseController):
"""
A Position Controller. The controller calculates a response only based on the desired position.
"""
def get_action(self, des_pos, des_vel):
return des_pos
class VelController(BaseController):
"""
A Velocity Controller. The controller calculates a response only based on the desired velocity.
"""
def get_action(self, des_pos, des_vel):
return des_vel
class PDController(BaseController):
"""
A PD-Controller. Using position and velocity information from a provided environment,
the controller calculates a response based on the desired position and velocity
:param env: A position environment
:param p_gains: Factors for the proportional gains
:param d_gains: Factors for the differential gains
"""
def __init__(self,
env,
p_gains: Union[float, Tuple],
d_gains: Union[float, Tuple]):
self.p_gains = p_gains
self.d_gains = d_gains
super(PDController, self).__init__(env)
def get_action(self, des_pos, des_vel):
cur_pos = self.env.current_pos
cur_vel = self.env.current_vel
assert des_pos.shape == cur_pos.shape, \
f"Mismatch in dimension between desired position {des_pos.shape} and current position {cur_pos.shape}"
assert des_vel.shape == cur_vel.shape, \
f"Mismatch in dimension between desired velocity {des_vel.shape} and current velocity {cur_vel.shape}"
trq = self.p_gains * (des_pos - cur_pos) + self.d_gains * (des_vel - cur_vel)
return trq
class MetaWorldController(BaseController):
"""
A Metaworld Controller. Using position and velocity information from a provided environment,
the controller calculates a response based on the desired position and velocity.
Unlike the other Controllers, this is a special controller for MetaWorld environments.
They use a position delta for the xyz coordinates and a raw position for the gripper opening.
:param env: A position environment
"""
def __init__(self,
env
):
super(MetaWorldController, self).__init__(env)
def get_action(self, des_pos, des_vel):
gripper_pos = des_pos[-1]
cur_pos = self.env.current_pos[:-1]
xyz_pos = des_pos[:-1]
assert xyz_pos.shape == cur_pos.shape, \
f"Mismatch in dimension between desired position {xyz_pos.shape} and current position {cur_pos.shape}"
trq = np.hstack([(xyz_pos - cur_pos), gripper_pos])
return trq
#TODO: Do we need this class?
class PDControllerExtend(BaseController):
"""
A PD-Controller. Using position and velocity information from a provided positional environment,
the controller calculates a response based on the desired position and velocity
:param env: A position environment
:param p_gains: Factors for the proportional gains
:param d_gains: Factors for the differential gains
"""
def __init__(self,
env,
p_gains: Union[float, Tuple],
d_gains: Union[float, Tuple]):
self.p_gains = p_gains
self.d_gains = d_gains
super(PDControllerExtend, self).__init__(env)
def get_action(self, des_pos, des_vel):
cur_pos = self.env.current_pos
cur_vel = self.env.current_vel
if len(des_pos) != len(cur_pos):
des_pos = self.env.extend_des_pos(des_pos)
if len(des_vel) != len(cur_vel):
des_vel = self.env.extend_des_vel(des_vel)
trq = self.p_gains * (des_pos - cur_pos) + self.d_gains * (des_vel - cur_vel)
return trq
def get_policy_class(policy_type, env, **kwargs):
if policy_type == "motor":
return PDController(env, **kwargs)
elif policy_type == "velocity":
return VelController(env)
elif policy_type == "position":
return PosController(env)
elif policy_type == "metaworld":
return MetaWorldController(env)
else:
raise ValueError(f"Invalid controller type {policy_type} provided. Only 'motor', 'velocity', 'position' "
f"and 'metaworld are currently supported controllers.")