diff --git a/alr_envs/alr/__init__.py b/alr_envs/alr/__init__.py index ac885fa..3182b72 100644 --- a/alr_envs/alr/__init__.py +++ b/alr_envs/alr/__init__.py @@ -825,3 +825,10 @@ for _v in _versions: } ) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id) + + +# --------------------- Testing new mp wrapper ----------------------------------------------------- + +# register( +# id='ALRReacherProMP-v0' +# ) \ No newline at end of file diff --git a/alr_envs/alr/mujoco/beerpong/new_mp_wrapper.py b/alr_envs/alr/mujoco/beerpong/new_mp_wrapper.py new file mode 100644 index 0000000..ca2e17e --- /dev/null +++ b/alr_envs/alr/mujoco/beerpong/new_mp_wrapper.py @@ -0,0 +1,20 @@ +from mp_wrapper import BaseMPWrapper +from typing import Union, Tuple +import numpy as np + + +class MPWrapper(BaseMPWrapper): + + def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + return self.env.sim.data.qpos[0:7].copy() + + def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + return self.env.sim.data.qvel[0:7].copy() + + def set_active_obs(self): + return np.hstack([ + [False] * 7, # cos + [False] * 7, # sin + [True] * 2, # xy position of cup + [False] # env steps + ]) diff --git a/alr_envs/alr/mujoco/reacher/new_mp_wrapper.py b/alr_envs/alr/mujoco/reacher/new_mp_wrapper.py new file mode 100644 index 0000000..43c8a51 --- /dev/null +++ b/alr_envs/alr/mujoco/reacher/new_mp_wrapper.py @@ -0,0 +1,24 @@ +from mp_wrapper import BaseMPWrapper +from typing import Union, Tuple +import numpy as np + + +class MPWrapper(BaseMPWrapper): + + @property + def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + return self.env.sim.data.qpos.flat[:self.env.n_links] + @property + def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + return self.env.sim.data.qvel.flat[:self.env.n_links] + + def set_active_obs(self): + return np.concatenate([ + [False] * self.env.n_links, # cos + [False] * self.env.n_links, # sin + [True] * 2, # goal position + [False] * self.env.n_links, # angular velocity + [False] * 3, # goal distance + # self.get_body_com("target"), # only return target to make problem harder + [False], # step + ]) diff --git a/mp_wrapper.py b/mp_wrapper.py new file mode 100644 index 0000000..202082a --- /dev/null +++ b/mp_wrapper.py @@ -0,0 +1,170 @@ +from abc import ABC, abstractmethod +from typing import Union, Tuple + +import gym +import numpy as np + +from gym import spaces +from gym.envs.mujoco import MujocoEnv +from policies import get_policy_class, BaseController +from mp_pytorch.mp.mp_interfaces import MPInterface + + +class BaseMPWrapper(gym.Env, ABC): + """ + Base class for movement primitive based gym.Wrapper implementations. + + Args: + env: The (wrapped) environment this wrapper is applied on + num_dof: Dimension of the action space of the wrapped env + num_basis: Number of basis functions per dof + duration: Length of the trajectory of the movement primitive in seconds + post_traj_time: Time for which the last position of the trajectory is fed to the environment to continue + simulation in seconds + policy_type: Type or object defining the policy that is used to generate action based on the trajectory + weight_scale: Scaling parameter for the actions given to this wrapper + render_mode: Equivalent to gym render mode + + """ + + def __init__(self, + env: MujocoEnv, + mp: MPInterface, + duration: float, + policy_type: Union[str, BaseController] = None, + render_mode: str = None, + **mp_kwargs + ): + super().__init__() + + assert env.dt is not None + self.env = env + self.dt = env.dt + self.duration = duration + self.traj_steps = int(duration / self.dt) + self.post_traj_steps = self.env.spec.max_episode_steps - self.traj_steps + + if isinstance(policy_type, str): + # pop policy kwargs here such that they are not passed to the initialize_mp method + self.policy = get_policy_class(policy_type, self, **mp_kwargs.pop('policy_kwargs', {})) + else: + self.policy = policy_type + + self.mp = mp + self.env = env + + # rendering + self.render_mode = render_mode + self.render_kwargs = {} + self.time_steps = np.linspace(0, self.duration, self.traj_steps + 1) + self.mp.set_mp_times(self.time_steps) + + # TODO: put action bounds in mp wrapper (e.g. time bound for traj. length ...), otherwis learning the durations + # might not work + # action_bounds = np.inf * np.ones((np.prod(self.mp.num_params))) + min_action_bounds, max_action_bounds = mp.get_param_bounds() + self.action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(), + dtype=np.float32) + + self.active_obs = self.set_active_obs() + self.observation_space = spaces.Box(low=self.env.observation_space.low[self.active_obs], + high=self.env.observation_space.high[self.active_obs], + dtype=self.env.observation_space.dtype) + + def get_trajectory(self, action: np.ndarray) -> Tuple: + self.mp.set_params(action) + traj_dict = self.mp.get_mp_trajs(get_pos = True, get_vel = True) + trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel'] + + trajectory = trajectory_tensor.numpy() + velocity = velocity_tensor.numpy() + if self.post_traj_steps > 0: + trajectory = np.vstack([trajectory, np.tile(trajectory[-1, :], [self.post_traj_steps, 1])]) + velocity = np.vstack([velocity, np.zeros(shape=(self.post_traj_steps, self.mp.num_dof))]) + + return trajectory, velocity + + @abstractmethod + def set_active_obs(self): + pass + + @property + @abstractmethod + def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + """ + Returns the current position of the action/control dimension. + The dimensionality has to match the action/control dimension. + This is not required when exclusively using velocity control, + it should, however, be implemented regardless. + E.g. The joint positions that are directly or indirectly controlled by the action. + """ + raise NotImplementedError() + + @property + @abstractmethod + def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + """ + Returns the current velocity of the action/control dimension. + The dimensionality has to match the action/control dimension. + This is not required when exclusively using position control, + it should, however, be implemented regardless. + E.g. The joint velocities that are directly or indirectly controlled by the action. + """ + raise NotImplementedError() + + def step(self, action: np.ndarray): + """ This function generates a trajectory based on a MP and then does the usual loop over reset and step""" + + + trajectory, velocity = self.get_trajectory(action) + + trajectory_length = len(trajectory) + actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape) + if isinstance(self.env.observation_space, spaces.Dict): # For goal environments + observations = np.zeros(shape=(trajectory_length,) + self.env.observation_space["observation"].shape, + dtype=self.env.observation_space.dtype) + else: + observations = np.zeros(shape=(trajectory_length,) + self.env.observation_space.shape, + dtype=self.env.observation_space.dtype) + rewards = np.zeros(shape=(trajectory_length,)) + trajectory_return = 0 + + infos = dict() + + for t, pos_vel in enumerate(zip(trajectory, velocity)): + ac = self.policy.get_action(pos_vel[0], pos_vel[1]) + actions[t, :] = np.clip(ac, self.env.action_space.low, self.env.action_space.high) + obs, rewards[t], done, info = self.env.step(actions[t, :]) + observations[t, :] = obs["observation"] if isinstance(self.env.observation_space, spaces.Dict) else obs + trajectory_return += rewards[t] + for k, v in info.items(): + elems = infos.get(k, [None] * trajectory_length) + elems[t] = v + infos[k] = elems + # infos['step_infos'].append(info) + if self.render_mode: + self.render(mode=self.render_mode, **self.render_kwargs) + if done: + break + infos.update({k: v[:t + 1] for k, v in infos.items()}) + infos['trajectory'] = trajectory + infos['step_actions'] = actions[:t + 1] + infos['step_observations'] = observations[:t + 1] + infos['step_rewards'] = rewards[:t + 1] + infos['trajectory_length'] = t + 1 + done = True + return self.get_observation_from_step(observations[t]), trajectory_return, done, infos + + def reset(self): + return self.get_observation_from_step(self.env.reset()) + + def render(self, mode='human', **kwargs): + """Only set render options here, such that they can be used during the rollout. + This only needs to be called once""" + self.render_mode = mode + self.render_kwargs = kwargs + # self.env.render(mode=self.render_mode, **self.render_kwargs) + self.env.render(mode=self.render_mode) + + def get_observation_from_step(self, observation: np.ndarray) -> np.ndarray: + return observation[self.active_obs] diff --git a/policies.py b/policies.py new file mode 100644 index 0000000..fd89f9e --- /dev/null +++ b/policies.py @@ -0,0 +1,129 @@ +from typing import Tuple, Union + +import numpy as np + + + +class BaseController: + def __init__(self, env, **kwargs): + self.env = env + + def get_action(self, des_pos, des_vel): + raise NotImplementedError + + +class PosController(BaseController): + """ + A Position Controller. The controller calculates a response only based on the desired position. + """ + def get_action(self, des_pos, des_vel): + return des_pos + + +class VelController(BaseController): + """ + A Velocity Controller. The controller calculates a response only based on the desired velocity. + """ + def get_action(self, des_pos, des_vel): + return des_vel + + +class PDController(BaseController): + """ + A PD-Controller. Using position and velocity information from a provided environment, + the controller calculates a response based on the desired position and velocity + + :param env: A position environment + :param p_gains: Factors for the proportional gains + :param d_gains: Factors for the differential gains + """ + + def __init__(self, + env, + p_gains: Union[float, Tuple], + d_gains: Union[float, Tuple]): + self.p_gains = p_gains + self.d_gains = d_gains + super(PDController, self).__init__(env) + + def get_action(self, des_pos, des_vel): + cur_pos = self.env.current_pos + cur_vel = self.env.current_vel + assert des_pos.shape == cur_pos.shape, \ + f"Mismatch in dimension between desired position {des_pos.shape} and current position {cur_pos.shape}" + assert des_vel.shape == cur_vel.shape, \ + f"Mismatch in dimension between desired velocity {des_vel.shape} and current velocity {cur_vel.shape}" + trq = self.p_gains * (des_pos - cur_pos) + self.d_gains * (des_vel - cur_vel) + return trq + + +class MetaWorldController(BaseController): + """ + A Metaworld Controller. Using position and velocity information from a provided environment, + the controller calculates a response based on the desired position and velocity. + Unlike the other Controllers, this is a special controller for MetaWorld environments. + They use a position delta for the xyz coordinates and a raw position for the gripper opening. + + :param env: A position environment + """ + + def __init__(self, + env + ): + super(MetaWorldController, self).__init__(env) + + def get_action(self, des_pos, des_vel): + gripper_pos = des_pos[-1] + + cur_pos = self.env.current_pos[:-1] + xyz_pos = des_pos[:-1] + + assert xyz_pos.shape == cur_pos.shape, \ + f"Mismatch in dimension between desired position {xyz_pos.shape} and current position {cur_pos.shape}" + trq = np.hstack([(xyz_pos - cur_pos), gripper_pos]) + return trq + + +#TODO: Do we need this class? +class PDControllerExtend(BaseController): + """ + A PD-Controller. Using position and velocity information from a provided positional environment, + the controller calculates a response based on the desired position and velocity + + :param env: A position environment + :param p_gains: Factors for the proportional gains + :param d_gains: Factors for the differential gains + """ + + def __init__(self, + env, + p_gains: Union[float, Tuple], + d_gains: Union[float, Tuple]): + + self.p_gains = p_gains + self.d_gains = d_gains + super(PDControllerExtend, self).__init__(env) + + def get_action(self, des_pos, des_vel): + cur_pos = self.env.current_pos + cur_vel = self.env.current_vel + if len(des_pos) != len(cur_pos): + des_pos = self.env.extend_des_pos(des_pos) + if len(des_vel) != len(cur_vel): + des_vel = self.env.extend_des_vel(des_vel) + trq = self.p_gains * (des_pos - cur_pos) + self.d_gains * (des_vel - cur_vel) + return trq + + +def get_policy_class(policy_type, env, **kwargs): + if policy_type == "motor": + return PDController(env, **kwargs) + elif policy_type == "velocity": + return VelController(env) + elif policy_type == "position": + return PosController(env) + elif policy_type == "metaworld": + return MetaWorldController(env) + else: + raise ValueError(f"Invalid controller type {policy_type} provided. Only 'motor', 'velocity', 'position' " + f"and 'metaworld are currently supported controllers.")