diff --git a/alr_envs/__init__.py b/alr_envs/__init__.py index e43e3b1..30fa7b8 100644 --- a/alr_envs/__init__.py +++ b/alr_envs/__init__.py @@ -1,5 +1,5 @@ from alr_envs import dmc, meta, open_ai -from alr_envs.utils.make_env_helpers import make, make_detpmp_env, make_dmp_env, make_rank +from alr_envs.utils.make_env_helpers import make, make_detpmp_env, make_dmp_env, make_promp_env, make_rank from alr_envs.utils import make_dmc # Convenience function for all MP environments diff --git a/alr_envs/alr/__init__.py b/alr_envs/alr/__init__.py index d9843c0..7521ccf 100644 --- a/alr_envs/alr/__init__.py +++ b/alr_envs/alr/__init__.py @@ -10,7 +10,7 @@ from .mujoco.ball_in_a_cup.biac_pd import ALRBallInACupPDEnv from .mujoco.reacher.alr_reacher import ALRReacherEnv from .mujoco.reacher.balancing import BalancingEnv -ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} +ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []} # Classic Control ## Simple Reacher @@ -110,7 +110,7 @@ register( "hole_width": 0.25, "hole_depth": 1, "hole_x": 2, - "collision_penalty": 100, + "collision_penalty": 1, } ) @@ -237,7 +237,7 @@ for _v in _versions: "mp_kwargs": { "num_dof": 2 if "long" not in _v.lower() else 5, "num_basis": 5, - "duration": 20, + "duration": 2, "alpha_phase": 2, "learn_goal": True, "policy_type": "velocity", @@ -247,6 +247,25 @@ for _v in _versions: ) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id) + _env_id = f'{_name[0]}ProMP-{_name[1]}' + register( + id=_env_id, + entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper', + kwargs={ + "name": f"alr_envs:{_v}", + "wrappers": [classic_control.simple_reacher.MPWrapper], + "mp_kwargs": { + "num_dof": 2 if "long" not in _v.lower() else 5, + "num_basis": 5, + "duration": 2, + "policy_type": "motor", + "weights_scale": 1, + "zero_start": True + } + } + ) + ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id) + _env_id = f'{_name[0]}DetPMP-{_name[1]}' register( id=_env_id, @@ -258,7 +277,7 @@ for _v in _versions: "mp_kwargs": { "num_dof": 2 if "long" not in _v.lower() else 5, "num_basis": 5, - "duration": 20, + "duration": 2, "width": 0.025, "policy_type": "velocity", "weights_scale": 0.2, @@ -289,6 +308,24 @@ register( ) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0") +register( + id="ViaPointReacherProMP-v0", + entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper', + kwargs={ + "name": f"alr_envs:ViaPointReacher-v0", + "wrappers": [classic_control.viapoint_reacher.MPWrapper], + "mp_kwargs": { + "num_dof": 5, + "num_basis": 5, + "duration": 2, + "policy_type": "motor", + "weights_scale": 1, + "zero_start": True + } + } +) +ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ViaPointReacherProMP-v0") + register( id='ViaPointReacherDetPMP-v0', entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper', @@ -325,7 +362,7 @@ for _v in _versions: "num_basis": 5, "duration": 2, "learn_goal": True, - "alpha_phase": 2, + "alpha_phase": 2.5, "bandwidth_factor": 2, "policy_type": "velocity", "weights_scale": 50, @@ -335,6 +372,25 @@ for _v in _versions: ) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id) + _env_id = f'HoleReacherProMP-{_v}' + register( + id=_env_id, + entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper', + kwargs={ + "name": f"alr_envs:HoleReacher-{_v}", + "wrappers": [classic_control.hole_reacher.MPWrapper], + "mp_kwargs": { + "num_dof": 5, + "num_basis": 5, + "duration": 2, + "policy_type": "velocity", + "weights_scale": 0.1, + "zero_start": True + } + } + ) + ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id) + _env_id = f'HoleReacherDetPMP-{_v}' register( id=_env_id, diff --git a/alr_envs/alr/classic_control/base_reacher/__init__.py b/alr_envs/alr/classic_control/base_reacher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alr_envs/alr/classic_control/base_reacher/base_reacher.py b/alr_envs/alr/classic_control/base_reacher/base_reacher.py new file mode 100644 index 0000000..1b1ad19 --- /dev/null +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher.py @@ -0,0 +1,146 @@ +from typing import Iterable, Union +from abc import ABC, abstractmethod +import gym +import matplotlib.pyplot as plt +import numpy as np +from gym import spaces +from gym.utils import seeding +from alr_envs.alr.classic_control.utils import intersect + + +class BaseReacherEnv(gym.Env, ABC): + """ + Base class for all reaching environments. + """ + + def __init__(self, n_links: int, random_start: bool = True, + allow_self_collision: bool = False): + super().__init__() + self.link_lengths = np.ones(n_links) + self.n_links = n_links + self._dt = 0.01 + + self.random_start = random_start + + self.allow_self_collision = allow_self_collision + + # state + self._joints = None + self._joint_angles = None + self._angle_velocity = None + self._acc = None + self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) + self._start_vel = np.zeros(self.n_links) + + # joint limits + self.j_min = -np.pi * np.ones(n_links) + self.j_max = np.pi * np.ones(n_links) + + self.steps_before_reward = 199 + + state_bound = np.hstack([ + [np.pi] * self.n_links, # cos + [np.pi] * self.n_links, # sin + [np.inf] * self.n_links, # velocity + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + + self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + self.reward_function = None # Needs to be set in sub class + + # containers for plotting + self.metadata = {'render.modes': ["human"]} + self.fig = None + + self._steps = 0 + self.seed() + + @property + def dt(self) -> Union[float, int]: + return self._dt + + @property + def current_pos(self): + return self._joint_angles.copy() + + @property + def current_vel(self): + return self._angle_velocity.copy() + + def reset(self): + # Sample only orientation of first link, i.e. the arm is always straight. + if self.random_start: + first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) + self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) + self._start_pos = self._joint_angles.copy() + else: + self._joint_angles = self._start_pos + + self._angle_velocity = self._start_vel + self._joints = np.zeros((self.n_links + 1, 2)) + self._update_joints() + self._steps = 0 + + return self._get_obs().copy() + + @abstractmethod + def step(self, action: np.ndarray): + """ + A single step with action in angular velocity space + """ + raise NotImplementedError + + def _update_joints(self): + """ + update joints to get new end-effector position. The other links are only required for rendering. + Returns: + + """ + angles = np.cumsum(self._joint_angles) + x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) + self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + + def _check_self_collision(self): + """Checks whether line segments intersect""" + + if self.allow_self_collision: + return False + + if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min): + return True + + link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1) + for i, line1 in enumerate(link_lines): + for line2 in link_lines[i + 2:, :]: + if intersect(line1[0], line1[-1], line2[0], line2[-1]): + return True + return False + + @abstractmethod + def _get_reward(self, action: np.ndarray) -> (float, dict): + pass + + @abstractmethod + def _get_obs(self) -> np.ndarray: + pass + + @abstractmethod + def _check_collisions(self) -> bool: + pass + + @abstractmethod + def _terminate(self, info) -> bool: + return False + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def close(self): + del self.fig + + @property + def end_effector(self): + return self._joints[self.n_links].T diff --git a/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py b/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py new file mode 100644 index 0000000..dc79827 --- /dev/null +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py @@ -0,0 +1,37 @@ +from abc import ABC + +from gym import spaces +import numpy as np +from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv + + +class BaseReacherDirectEnv(BaseReacherEnv, ABC): + """ + Base class for directly controlled reaching environments + """ + def __init__(self, n_links: int, random_start: bool = True, + allow_self_collision: bool = False): + super().__init__(n_links, random_start, allow_self_collision) + + self.max_vel = 2 * np.pi + action_bound = np.ones((self.n_links,)) * self.max_vel + self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + + def step(self, action: np.ndarray): + """ + A single step with action in angular velocity space + """ + + self._acc = (action - self._angle_velocity) / self.dt + self._angle_velocity = action + self._joint_angles = self._joint_angles + self.dt * self._angle_velocity + self._update_joints() + + self._is_collided = self._check_collisions() + + reward, info = self._get_reward(action) + + self._steps += 1 + done = self._terminate(info) + + return self._get_obs().copy(), reward, done, info diff --git a/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py b/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py new file mode 100644 index 0000000..094f632 --- /dev/null +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py @@ -0,0 +1,36 @@ +from abc import ABC + +from gym import spaces +import numpy as np +from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv + + +class BaseReacherTorqueEnv(BaseReacherEnv, ABC): + """ + Base class for torque controlled reaching environments + """ + def __init__(self, n_links: int, random_start: bool = True, + allow_self_collision: bool = False): + super().__init__(n_links, random_start, allow_self_collision) + + self.max_torque = 1000 + action_bound = np.ones((self.n_links,)) * self.max_torque + self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + + def step(self, action: np.ndarray): + """ + A single step with action in torque space + """ + + self._angle_velocity = self._angle_velocity + self.dt * action + self._joint_angles = self._joint_angles + self.dt * self._angle_velocity + self._update_joints() + + self._is_collided = self._check_collisions() + + reward, info = self._get_reward(action) + + self._steps += 1 + done = False + + return self._get_obs().copy(), reward, done, info diff --git a/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py b/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py index ee1a997..208f005 100644 --- a/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py +++ b/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py @@ -3,22 +3,17 @@ from typing import Union import gym import matplotlib.pyplot as plt import numpy as np -from gym.utils import seeding from matplotlib import patches -from alr_envs.alr.classic_control.utils import check_self_collision +from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv -class HoleReacherEnv(gym.Env): - +class HoleReacherEnv(BaseReacherDirectEnv): def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, - allow_wall_collision: bool = False, collision_penalty: float = 1000): + allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"): - self.n_links = n_links - self.link_lengths = np.ones((n_links, 1)) - - self.random_start = random_start + super().__init__(n_links, random_start, allow_self_collision) # provided initial parameters self.initial_x = hole_x # x-position of center of hole @@ -31,21 +26,7 @@ class HoleReacherEnv(gym.Env): self._tmp_depth = None self._goal = None # x-y coordinates for reaching the center at the bottom of the hole - # collision - self.allow_self_collision = allow_self_collision - self.allow_wall_collision = allow_wall_collision - self.collision_penalty = collision_penalty - - # state - self._joints = None - self._joint_angles = None - self._angle_velocity = None - self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) - self._start_vel = np.zeros(self.n_links) - - self._dt = 0.01 - - action_bound = np.pi * np.ones((self.n_links,)) + # action_bound = np.pi * np.ones((self.n_links,)) state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -55,71 +36,30 @@ class HoleReacherEnv(gym.Env): [np.inf] * 2, # x-y coordinates of target distance [np.inf] # env steps, because reward start after n steps TODO: Maybe ]) - self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + # self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - # containers for plotting - self.metadata = {'render.modes': ["human", "partial"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self) -> Union[float, int]: - return self._dt - - # @property - # def start_pos(self): - # return self._start_pos - - @property - def current_pos(self): - return self._joint_angles.copy() - - @property - def current_vel(self): - return self._angle_velocity.copy() - - def step(self, action: np.ndarray): - """ - A single step with an action in joint velocity space - """ - - acc = (action - self._angle_velocity) / self.dt - self._angle_velocity = action - self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5) - self._update_joints() - - reward, info = self._get_reward(acc) - - info.update({"is_collided": self._is_collided}) - self.end_effector_traj.append(np.copy(self.end_effector)) - - self._steps += 1 - done = self._is_collided - - return self._get_obs().copy(), reward, done, info + if rew_fct == "simple": + from alr_envs.alr.classic_control.hole_reacher.hr_simple_reward import HolereacherReward + self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty) + elif rew_fct == "vel_acc": + from alr_envs.alr.classic_control.hole_reacher.hr_dist_vel_acc_reward import HolereacherReward + self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty) + else: + raise ValueError("Unknown reward function {}".format(rew_fct)) def reset(self): - if self.random_start: - # Maybe change more than first seed - first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) - self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - self._generate_hole() self._set_patches() + self.reward_function.reset() - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - self.end_effector_traj = [] + return super().reset() - return self._get_obs().copy() + def _get_reward(self, action: np.ndarray) -> (float, dict): + return self.reward_function.get_reward(self) + + def _terminate(self, info): + return info["is_collided"] def _generate_hole(self): if self.initial_width is None: @@ -144,45 +84,17 @@ class HoleReacherEnv(gym.Env): self._tmp_depth = depth self._goal = np.hstack([self._tmp_x, -self._tmp_depth]) - def _update_joints(self): - """ - update _joints to get new end effector position. The other links are only required for rendering. - Returns: + self._line_ground_left = np.array([-self.n_links, 0, x - width / 2, 0]) + self._line_ground_right = np.array([x + width / 2, 0, self.n_links, 0]) + self._line_ground_hole = np.array([x - width / 2, -depth, x + width / 2, -depth]) + self._line_hole_left = np.array([x - width / 2, -depth, x - width / 2, 0]) + self._line_hole_right = np.array([x + width / 2, -depth, x + width / 2, 0]) - """ - line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20) - - self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] - self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] - - self_collision = False - wall_collision = False - - if not self.allow_self_collision: - self_collision = check_self_collision(line_points_in_taskspace) - if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: - self_collision = True - - if not self.allow_wall_collision: - wall_collision = self._check_wall_collision(line_points_in_taskspace) - - self._is_collided = self_collision or wall_collision - - def _get_reward(self, acc: np.ndarray): - reward = 0 - # success = False - - if self._steps == 199 or self._is_collided: - # return reward only in last time step - # Episode also terminates when colliding, hence return reward - dist = np.linalg.norm(self.end_effector - self._goal) - # success = dist < 0.005 and not self._is_collided - reward = - dist ** 2 - self.collision_penalty * self._is_collided - - reward -= 5e-8 * np.sum(acc ** 2) - # info = {"is_success": success} - - return reward, {} # info + self.ground_lines = np.stack((self._line_ground_left, + self._line_ground_right, + self._line_ground_hole, + self._line_hole_left, + self._line_hole_right)) def _get_obs(self): theta = self._joint_angles @@ -194,17 +106,17 @@ class HoleReacherEnv(gym.Env): # self._tmp_hole_depth, self.end_effector - self._goal, self._steps - ]) + ]).astype(np.float32) - def _get_forward_kinematics(self, num_points_per_link=1): + def _get_line_points(self, num_points_per_link=1): theta = self._joint_angles[:, None] intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 accumulated_theta = np.cumsum(theta, axis=0) end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) - x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points - y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points + x = np.cos(accumulated_theta) * self.link_lengths[:, None] * intermediate_points + y = np.sin(accumulated_theta) * self.link_lengths[:, None] * intermediate_points end_effector[0, :, 0] = x[0, :] end_effector[0, :, 1] = y[0, :] @@ -215,7 +127,12 @@ class HoleReacherEnv(gym.Env): return np.squeeze(end_effector + self._joints[0, :]) - def _check_wall_collision(self, line_points): + def _check_collisions(self) -> bool: + return self._check_self_collision() or self.check_wall_collision() + + def check_wall_collision(self): + line_points = self._get_line_points(num_points_per_link=100) + # all points that are before the hole in x r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2)) @@ -281,7 +198,7 @@ class HoleReacherEnv(gym.Env): def _set_patches(self): if self.fig is not None: - self.fig.gca().patches = [] + # self.fig.gca().patches = [] left_block = patches.Rectangle((-self.n_links, -self._tmp_depth), self.n_links + self._tmp_x - self._tmp_width / 2, self._tmp_depth, @@ -300,15 +217,15 @@ class HoleReacherEnv(gym.Env): self.fig.gca().add_patch(right_block) self.fig.gca().add_patch(hole_floor) - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - @property - def end_effector(self): - return self._joints[self.n_links].T +if __name__ == "__main__": + import time + env = HoleReacherEnv(5) + env.reset() - def close(self): - super().close() - if self.fig is not None: - plt.close(self.fig) + for i in range(10000): + ac = env.action_space.sample() + obs, rew, done, info = env.step(ac) + env.render() + if done: + env.reset() diff --git a/alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py b/alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py new file mode 100644 index 0000000..1d75b66 --- /dev/null +++ b/alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py @@ -0,0 +1,60 @@ +import numpy as np + + +class HolereacherReward: + def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty): + self.collision_penalty = collision_penalty + + # collision + self.allow_self_collision = allow_self_collision + self.allow_wall_collision = allow_wall_collision + self.collision_penalty = collision_penalty + self._is_collided = False + + self.reward_factors = np.array((-1, -1e-4, -1e-6, -collision_penalty, 0)) + + def reset(self): + self._is_collided = False + self.collision_dist = 0 + + def get_reward(self, env): + dist_cost = 0 + collision_cost = 0 + time_cost = 0 + success = False + + self_collision = False + wall_collision = False + + if not self._is_collided: + if not self.allow_self_collision: + self_collision = env._check_self_collision() + + if not self.allow_wall_collision: + wall_collision = env.check_wall_collision() + + self._is_collided = self_collision or wall_collision + + self.collision_dist = np.linalg.norm(env.end_effector - env._goal) + + if env._steps == 199: # or self._is_collided: + # return reward only in last time step + # Episode also terminates when colliding, hence return reward + dist = np.linalg.norm(env.end_effector - env._goal) + + success = dist < 0.005 and not self._is_collided + dist_cost = dist ** 2 + collision_cost = self._is_collided * self.collision_dist ** 2 + time_cost = 199 - env._steps + + info = {"is_success": success, + "is_collided": self._is_collided, + "end_effector": np.copy(env.end_effector)} + + vel_cost = np.sum(env._angle_velocity ** 2) + acc_cost = np.sum(env._acc ** 2) + + reward_features = np.array((dist_cost, vel_cost, acc_cost, collision_cost, time_cost)) + reward = np.dot(reward_features, self.reward_factors) + + return reward, info diff --git a/alr_envs/alr/classic_control/hole_reacher/hr_simple_reward.py b/alr_envs/alr/classic_control/hole_reacher/hr_simple_reward.py new file mode 100644 index 0000000..5820ab1 --- /dev/null +++ b/alr_envs/alr/classic_control/hole_reacher/hr_simple_reward.py @@ -0,0 +1,53 @@ +import numpy as np + + +class HolereacherReward: + def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty): + self.collision_penalty = collision_penalty + + # collision + self.allow_self_collision = allow_self_collision + self.allow_wall_collision = allow_wall_collision + self.collision_penalty = collision_penalty + self._is_collided = False + + self.reward_factors = np.array((-1, -5e-8, -collision_penalty)) + + def reset(self): + self._is_collided = False + + def get_reward(self, env): + dist_cost = 0 + collision_cost = 0 + success = False + + self_collision = False + wall_collision = False + + if not self.allow_self_collision: + self_collision = env._check_self_collision() + + if not self.allow_wall_collision: + wall_collision = env.check_wall_collision() + + self._is_collided = self_collision or wall_collision + + if env._steps == 199 or self._is_collided: + # return reward only in last time step + # Episode also terminates when colliding, hence return reward + dist = np.linalg.norm(env.end_effector - env._goal) + dist_cost = dist ** 2 + collision_cost = int(self._is_collided) + + success = dist < 0.005 and not self._is_collided + + info = {"is_success": success, + "is_collided": self._is_collided, + "end_effector": np.copy(env.end_effector)} + + acc_cost = np.sum(env._acc ** 2) + + reward_features = np.array((dist_cost, acc_cost, collision_cost)) + reward = np.dot(reward_features, self.reward_factors) + + return reward, info diff --git a/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py b/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py index d951161..feb545f 100644 --- a/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py +++ b/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py @@ -18,6 +18,14 @@ class MPWrapper(MPEnvWrapper): [False] # env steps ]) + # @property + # def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + # return self._joint_angles.copy() + # + # @property + # def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + # return self._angle_velocity.copy() + @property def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: return self.env.current_pos diff --git a/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py b/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py index 08c151f..dac06a3 100644 --- a/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py +++ b/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py @@ -1,26 +1,22 @@ from typing import Iterable, Union -import gym import matplotlib.pyplot as plt import numpy as np from gym import spaces -from gym.utils import seeding + +from alr_envs.alr.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv -class SimpleReacherEnv(gym.Env): +class SimpleReacherEnv(BaseReacherTorqueEnv): """ Simple Reaching Task without any physics simulation. Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions towards the end of the trajectory. """ - def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True): - super().__init__() - self.link_lengths = np.ones(n_links) - self.n_links = n_links - self._dt = 0.1 - - self.random_start = random_start + def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True, + allow_self_collision: bool = False,): + super().__init__(n_links, random_start, allow_self_collision) # provided initial parameters self.inital_target = target @@ -28,16 +24,10 @@ class SimpleReacherEnv(gym.Env): # temp container for current env state self._goal = None - self._joints = None - self._joint_angles = None - self._angle_velocity = None self._start_pos = np.zeros(self.n_links) - self._start_vel = np.zeros(self.n_links) - self.max_torque = 1 self.steps_before_reward = 199 - action_bound = np.ones((self.n_links,)) * self.max_torque state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -45,84 +35,24 @@ class SimpleReacherEnv(gym.Env): [np.inf] * 2, # x-y coordinates of target distance [np.inf] # env steps, because reward start after n steps TODO: Maybe ]) - self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - # containers for plotting - self.metadata = {'render.modes': ["human"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self) -> Union[float, int]: - return self._dt - # @property # def start_pos(self): # return self._start_pos - @property - def current_pos(self): - return self._joint_angles - - @property - def current_vel(self): - return self._angle_velocity - - def step(self, action: np.ndarray): - """ - A single step with action in torque space - """ - - # action = self._add_action_noise(action) - ac = np.clip(action, -self.max_torque, self.max_torque) - - self._angle_velocity = self._angle_velocity + self.dt * ac - self._joint_angles = self._joint_angles + self.dt * self._angle_velocity - self._update_joints() - - reward, info = self._get_reward(action) - - self._steps += 1 - done = False - - return self._get_obs().copy(), reward, done, info - def reset(self): - - # TODO: maybe do initialisation more random? - # Sample only orientation of first link, i.e. the arm is always straight. - if self.random_start: - self._joint_angles = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - self._generate_goal() - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - - return self._get_obs().copy() - - def _update_joints(self): - """ - update joints to get new end-effector position. The other links are only required for rendering. - Returns: - - """ - angles = np.cumsum(self._joint_angles) - x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) - self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + return super().reset() def _get_reward(self, action: np.ndarray): diff = self.end_effector - self._goal reward_dist = 0 + if not self.allow_self_collision: + self._is_collided = self._check_self_collision() + if self._steps >= self.steps_before_reward: reward_dist -= np.linalg.norm(diff) # reward_dist = np.exp(-0.1 * diff ** 2).mean() @@ -132,6 +62,9 @@ class SimpleReacherEnv(gym.Env): reward = reward_dist - reward_ctrl return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl) + def _terminate(self, info): + return False + def _get_obs(self): theta = self._joint_angles return np.hstack([ @@ -140,7 +73,7 @@ class SimpleReacherEnv(gym.Env): self._angle_velocity, self.end_effector - self._goal, self._steps - ]) + ]).astype(np.float32) def _generate_goal(self): @@ -155,6 +88,9 @@ class SimpleReacherEnv(gym.Env): self._goal = goal + def _check_collisions(self) -> bool: + return self._check_self_collision() + def render(self, mode='human'): # pragma: no cover if self.fig is None: # Create base figure once on the beginning. Afterwards only update @@ -190,13 +126,14 @@ class SimpleReacherEnv(gym.Env): self.fig.canvas.draw() self.fig.canvas.flush_events() - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - def close(self): - del self.fig +if __name__ == "__main__": + env = SimpleReacherEnv(5) + env.reset() + for i in range(200): + ac = env.action_space.sample() + obs, rew, done, info = env.step(ac) - @property - def end_effector(self): - return self._joints[self.n_links].T + env.render() + if done: + break diff --git a/alr_envs/alr/classic_control/utils.py b/alr_envs/alr/classic_control/utils.py index fa8176a..b557a0a 100644 --- a/alr_envs/alr/classic_control/utils.py +++ b/alr_envs/alr/classic_control/utils.py @@ -1,3 +1,6 @@ +import numpy as np + + def ccw(A, B, C): return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0]) > 1e-12 @@ -10,7 +13,7 @@ def intersect(A, B, C, D): def check_self_collision(line_points): - """Checks whether line segments and intersect""" + """Checks whether line segments intersect""" for i, line1 in enumerate(line_points): for line2 in line_points[i + 2:, :, :]: if intersect(line1[0], line1[-1], line2[0], line2[-1]): diff --git a/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py b/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py index 95d1e7f..b44647e 100644 --- a/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py +++ b/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py @@ -6,17 +6,15 @@ import numpy as np from gym.utils import seeding from alr_envs.alr.classic_control.utils import check_self_collision +from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv -class ViaPointReacherEnv(gym.Env): +class ViaPointReacherEnv(BaseReacherDirectEnv): def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None, target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000): - self.n_links = n_links - self.link_lengths = np.ones((n_links, 1)) - - self.random_start = random_start + super().__init__(n_links, random_start, allow_self_collision) # provided initial parameters self.intitial_target = target # provided target value @@ -27,20 +25,8 @@ class ViaPointReacherEnv(gym.Env): self._goal = np.array((n_links, 0)) # collision - self.allow_self_collision = allow_self_collision self.collision_penalty = collision_penalty - # state - self._joints = None - self._joint_angles = None - self._angle_velocity = None - self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) - self._start_vel = np.zeros(self.n_links) - self.weight_matrix_scale = 1 - - self._dt = 0.01 - - action_bound = np.pi * np.ones((self.n_links,)) state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -49,69 +35,15 @@ class ViaPointReacherEnv(gym.Env): [np.inf] * 2, # x-y coordinates of target distance [np.inf] # env steps, because reward start after n steps ]) - self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - # containers for plotting - self.metadata = {'render.modes': ["human", "partial"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self): - return self._dt - # @property # def start_pos(self): # return self._start_pos - @property - def current_pos(self): - return self._joint_angles.copy() - - @property - def current_vel(self): - return self._angle_velocity.copy() - - def step(self, action: np.ndarray): - """ - a single step with an action in joint velocity space - """ - vel = action - self._angle_velocity = vel - self._joint_angles = self._joint_angles + self.dt * self._angle_velocity - self._update_joints() - - acc = (vel - self._angle_velocity) / self.dt - reward, info = self._get_reward(acc) - - info.update({"is_collided": self._is_collided}) - - self._steps += 1 - done = self._is_collided - - return self._get_obs().copy(), reward, done, info - def reset(self): - - if self.random_start: - # Maybe change more than dirst seed - first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) - self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - self._generate_goal() - - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - - return self._get_obs().copy() + return super().reset() def _generate_goal(self): # TODO: Maybe improve this later, this can yield quite a lot of invalid settings @@ -137,29 +69,13 @@ class ViaPointReacherEnv(gym.Env): self._via_point = via_target self._goal = goal - def _update_joints(self): - """ - update _joints to get new end effector position. The other links are only required for rendering. - Returns: - - """ - line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20) - - self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] - self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] - - self_collision = False - - if not self.allow_self_collision: - self_collision = check_self_collision(line_points_in_taskspace) - if np.any(np.abs(self._joint_angles) > np.pi): - self_collision = True - - self._is_collided = self_collision - def _get_reward(self, acc): success = False reward = -np.inf + + if not self.allow_self_collision: + self._is_collided = self._check_self_collision() + if not self._is_collided: dist = np.inf # return intermediate reward for via point @@ -177,10 +93,15 @@ class ViaPointReacherEnv(gym.Env): reward -= dist ** 2 reward -= 5e-8 * np.sum(acc ** 2) - info = {"is_success": success} + info = {"is_success": success, + "is_collided": self._is_collided, + "end_effector": np.copy(self.end_effector)} return reward, info + def _terminate(self, info): + return info["is_collided"] + def _get_obs(self): theta = self._joint_angles return np.hstack([ @@ -190,28 +111,10 @@ class ViaPointReacherEnv(gym.Env): self.end_effector - self._via_point, self.end_effector - self._goal, self._steps - ]) + ]).astype(np.float32) - def get_forward_kinematics(self, num_points_per_link=1): - theta = self._joint_angles[:, None] - - intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 - - accumulated_theta = np.cumsum(theta, axis=0) - - endeffector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) - - x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points - y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points - - endeffector[0, :, 0] = x[0, :] - endeffector[0, :, 1] = y[0, :] - - for i in range(1, self.n_links): - endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0] - endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1] - - return np.squeeze(endeffector + self._joints[0, :]) + def _check_collisions(self) -> bool: + return self._check_self_collision() def render(self, mode='human'): goal_pos = self._goal.T @@ -281,14 +184,14 @@ class ViaPointReacherEnv(gym.Env): plt.pause(0.01) - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] +if __name__ == "__main__": + import time + env = ViaPointReacherEnv(5) + env.reset() - @property - def end_effector(self): - return self._joints[self.n_links].T - - def close(self): - if self.fig is not None: - plt.close(self.fig) + for i in range(10000): + ac = env.action_space.sample() + obs, rew, done, info = env.step(ac) + env.render() + if done: + env.reset() diff --git a/alr_envs/alr/mujoco/reacher/alr_reacher.py b/alr_envs/alr/mujoco/reacher/alr_reacher.py index 6ca66d1..2d122d2 100644 --- a/alr_envs/alr/mujoco/reacher/alr_reacher.py +++ b/alr_envs/alr/mujoco/reacher/alr_reacher.py @@ -87,6 +87,7 @@ class ALRReacherEnv(MujocoEnv, utils.EzPickle): [self._steps], ]) + if __name__ == '__main__': nl = 5 render_mode = "human" # "human" or "partial" or "final" diff --git a/alr_envs/dmc/__init__.py b/alr_envs/dmc/__init__.py index 17d1f7f..ca6469e 100644 --- a/alr_envs/dmc/__init__.py +++ b/alr_envs/dmc/__init__.py @@ -1,6 +1,6 @@ from . import manipulation, suite -ALL_DEEPMIND_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} +ALL_DEEPMIND_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []} from gym.envs.registration import register diff --git a/alr_envs/dmc/dmc_wrapper.py b/alr_envs/dmc/dmc_wrapper.py index 10f1af9..aa6c7aa 100644 --- a/alr_envs/dmc/dmc_wrapper.py +++ b/alr_envs/dmc/dmc_wrapper.py @@ -15,10 +15,10 @@ def _spec_to_box(spec): assert s.dtype == np.float64 or s.dtype == np.float32, f"Only float64 and float32 types are allowed, instead {s.dtype} was found" dim = int(np.prod(s.shape)) if type(s) == specs.Array: - bound = np.inf * np.ones(dim, dtype=np.float32) + bound = np.inf * np.ones(dim, dtype=s.dtype) return -bound, bound elif type(s) == specs.BoundedArray: - zeros = np.zeros(dim, dtype=np.float32) + zeros = np.zeros(dim, dtype=s.dtype) return s.minimum + zeros, s.maximum + zeros mins, maxs = [], [] @@ -29,7 +29,7 @@ def _spec_to_box(spec): low = np.concatenate(mins, axis=0) high = np.concatenate(maxs, axis=0) assert low.shape == high.shape - return spaces.Box(low, high, dtype=np.float32) + return spaces.Box(low, high, dtype=s.dtype) def _flatten_obs(obs: collections.MutableMapping): @@ -113,7 +113,7 @@ class DMCWrapper(core.Env): if self._channels_first: obs = obs.transpose(2, 0, 1).copy() else: - obs = _flatten_obs(time_step.observation) + obs = _flatten_obs(time_step.observation).astype(self.observation_space.dtype) return obs @property diff --git a/alr_envs/examples/examples_motion_primitives.py b/alr_envs/examples/examples_motion_primitives.py index 6decdb1..0df05c1 100644 --- a/alr_envs/examples/examples_motion_primitives.py +++ b/alr_envs/examples/examples_motion_primitives.py @@ -112,7 +112,7 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True): # Replace this wrapper with the custom wrapper for your environment by inheriting from the MPEnvWrapper. # You can also add other gym.Wrappers in case they are needed. - wrappers = [alr_envs.classic_control.hole_reacher.MPWrapper] + wrappers = [alr_envs.alr.classic_control.hole_reacher.MPWrapper] mp_kwargs = { "num_dof": 5, "num_basis": 5, @@ -147,10 +147,13 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True): if __name__ == '__main__': - render = False + render = True # DMP example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) + # ProMP + example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=1, render=render) + # DetProMP example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=1, render=render) diff --git a/alr_envs/meta/__init__.py b/alr_envs/meta/__init__.py index fa63c94..9db0689 100644 --- a/alr_envs/meta/__init__.py +++ b/alr_envs/meta/__init__.py @@ -3,7 +3,7 @@ from gym import register from . import goal_object_change_mp_wrapper, goal_change_mp_wrapper, goal_endeffector_change_mp_wrapper, \ object_change_mp_wrapper -ALL_METAWORLD_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} +ALL_METAWORLD_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []} # MetaWorld diff --git a/alr_envs/open_ai/__init__.py b/alr_envs/open_ai/__init__.py index 51dd712..63083ca 100644 --- a/alr_envs/open_ai/__init__.py +++ b/alr_envs/open_ai/__init__.py @@ -3,7 +3,7 @@ from gym.wrappers import FlattenObservation from . import classic_control, mujoco, robotics -ALL_GYM_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} +ALL_GYM_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []} # Short Continuous Mountain Car register( diff --git a/alr_envs/utils/make_env_helpers.py b/alr_envs/utils/make_env_helpers.py index fc73b05..19f54d6 100644 --- a/alr_envs/utils/make_env_helpers.py +++ b/alr_envs/utils/make_env_helpers.py @@ -7,6 +7,7 @@ from gym.envs.registration import EnvSpec from mp_env_api import MPEnvWrapper from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper from mp_env_api.mp_wrappers.dmp_wrapper import DmpWrapper +from mp_env_api.mp_wrappers.promp_wrapper import ProMPWrapper def make_rank(env_id: str, seed: int, rank: int = 0, return_callable=True, **kwargs): @@ -132,6 +133,26 @@ def make_dmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs return DmpWrapper(_env, **mp_kwargs) +def make_promp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs): + """ + This can also be used standalone for manually building a custom ProMP environment. + Args: + env_id: base_env_name, + wrappers: list of wrappers (at least an MPEnvWrapper), + mp_kwargs: dict of at least {num_dof: int, num_basis: int, width: int} + + Returns: ProMP wrapped gym env + + """ + _verify_time_limit(mp_kwargs.get("duration", None), kwargs.get("time_limit", None)) + + _env = _make_wrapped_env(env_id=env_id, wrappers=wrappers, seed=seed, **kwargs) + + _verify_dof(_env, mp_kwargs.get("num_dof")) + + return ProMPWrapper(_env, **mp_kwargs) + + def make_detpmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs): """ This can also be used standalone for manually building a custom Det ProMP environment. @@ -140,7 +161,7 @@ def make_detpmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwa wrappers: list of wrappers (at least an MPEnvWrapper), mp_kwargs: dict of at least {num_dof: int, num_basis: int, width: int} - Returns: DMP wrapped gym env + Returns: Det ProMP wrapped gym env """ _verify_time_limit(mp_kwargs.get("duration", None), kwargs.get("time_limit", None)) @@ -171,6 +192,26 @@ def make_dmp_env_helper(**kwargs): mp_kwargs=kwargs.pop("mp_kwargs"), **kwargs) +def make_promp_env_helper(**kwargs): + """ + Helper function for registering ProMP gym environments. + This can also be used standalone for manually building a custom ProMP environment. + Args: + **kwargs: expects at least the following: + { + "name": base_env_name, + "wrappers": list of wrappers (at least an MPEnvWrapper), + "mp_kwargs": dict of at least {num_dof: int, num_basis: int, width: int} + } + + Returns: ProMP wrapped gym env + + """ + seed = kwargs.pop("seed", None) + return make_promp_env(env_id=kwargs.pop("name"), wrappers=kwargs.pop("wrappers"), seed=seed, + mp_kwargs=kwargs.pop("mp_kwargs"), **kwargs) + + def make_detpmp_env_helper(**kwargs): """ Helper function for registering ProMP gym environments.