From 2543bcd7ece7c35fba1d57a895b081c167bfb731 Mon Sep 17 00:00:00 2001 From: Maximilian Huettenrauch Date: Fri, 26 Nov 2021 16:31:46 +0100 Subject: [PATCH] reacher envs cleanup - base classes for direct and torque control - added promp wrapper support --- alr_envs/alr/__init__.py | 41 +- .../base_reacher/base_reacher.py | 141 +++++++ .../base_reacher/base_reacher_direct.py | 138 +------ .../base_reacher/base_reacher_torque.py | 119 +----- .../hole_reacher/hole_reacher.py | 362 +----------------- .../hole_reacher/hole_reacher_old.py | 315 +++++++++++++++ .../hole_reacher/hr_dist_vel_acc_reward.py | 56 +++ .../{simple_reward.py => hr_simple_reward.py} | 29 +- .../simple_reacher/simple_reacher.py | 110 ++---- .../viapoint_reacher/viapoint_reacher.py | 147 ++----- .../examples/examples_motion_primitives.py | 8 +- 11 files changed, 645 insertions(+), 821 deletions(-) create mode 100644 alr_envs/alr/classic_control/base_reacher/base_reacher.py create mode 100644 alr_envs/alr/classic_control/hole_reacher/hole_reacher_old.py create mode 100644 alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py rename alr_envs/alr/classic_control/hole_reacher/{simple_reward.py => hr_simple_reward.py} (64%) diff --git a/alr_envs/alr/__init__.py b/alr_envs/alr/__init__.py index 70902b4..a364b96 100644 --- a/alr_envs/alr/__init__.py +++ b/alr_envs/alr/__init__.py @@ -83,7 +83,7 @@ register( register( id='HoleReacher-v1', - entry_point='alr_envs.alr.classic_control:HoleReacherEnvOld', + entry_point='alr_envs.alr.classic_control:HoleReacherEnv', max_episode_steps=200, kwargs={ "n_links": 5, @@ -109,7 +109,7 @@ register( "hole_width": 0.25, "hole_depth": 1, "hole_x": 2, - "collision_penalty": 100, + "collision_penalty": 1, } ) @@ -220,6 +220,25 @@ for _v in _versions: ) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id) + _env_id = f'{_name[0]}ProMP-{_name[1]}' + register( + id=_env_id, + entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper', + kwargs={ + "name": f"alr_envs:{_v}", + "wrappers": [classic_control.simple_reacher.MPWrapper], + "mp_kwargs": { + "num_dof": 2 if "long" not in _v.lower() else 5, + "num_basis": 5, + "duration": 2, + "policy_type": "motor", + "weights_scale": 1, + "zero_start": True + } + } + ) + ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id) + _env_id = f'{_name[0]}DetPMP-{_name[1]}' register( id=_env_id, @@ -262,6 +281,24 @@ register( ) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0") +register( + id="ViaPointReacherProMP-v0", + entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper', + kwargs={ + "name": f"alr_envs:ViaPointReacher-v0", + "wrappers": [classic_control.viapoint_reacher.MPWrapper], + "mp_kwargs": { + "num_dof": 5, + "num_basis": 5, + "duration": 2, + "policy_type": "motor", + "weights_scale": 1, + "zero_start": True + } + } +) +ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ViaPointReacherProMP-v0") + register( id='ViaPointReacherDetPMP-v0', entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper', diff --git a/alr_envs/alr/classic_control/base_reacher/base_reacher.py b/alr_envs/alr/classic_control/base_reacher/base_reacher.py new file mode 100644 index 0000000..66b234f --- /dev/null +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher.py @@ -0,0 +1,141 @@ +from typing import Iterable, Union +from abc import ABCMeta, abstractmethod +import gym +import matplotlib.pyplot as plt +import numpy as np +from gym import spaces +from gym.utils import seeding +from alr_envs.alr.classic_control.utils import intersect + + +class BaseReacherEnv(gym.Env): + """ + Base class for all reaching environments. + """ + + def __init__(self, n_links: int, random_start: bool = True, + allow_self_collision: bool = False): + super().__init__() + self.link_lengths = np.ones(n_links) + self.n_links = n_links + self._dt = 0.01 + + self.random_start = random_start + + # state + self._joints = None + self._joint_angles = None + self._angle_velocity = None + self._acc = None + self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) + self._start_vel = np.zeros(self.n_links) + + # joint limits + self.j_min = -np.pi * np.ones(n_links) + self.j_max = np.pi * np.ones(n_links) + + self.steps_before_reward = 199 + + state_bound = np.hstack([ + [np.pi] * self.n_links, # cos + [np.pi] * self.n_links, # sin + [np.inf] * self.n_links, # velocity + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + + self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + self.reward_function = None # Needs to be set in sub class + + # containers for plotting + self.metadata = {'render.modes': ["human"]} + self.fig = None + + self._steps = 0 + self.seed() + + @property + def dt(self) -> Union[float, int]: + return self._dt + + @property + def current_pos(self): + return self._joint_angles.copy() + + @property + def current_vel(self): + return self._angle_velocity.copy() + + def reset(self): + # Sample only orientation of first link, i.e. the arm is always straight. + if self.random_start: + first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) + self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) + self._start_pos = self._joint_angles.copy() + else: + self._joint_angles = self._start_pos + + self._angle_velocity = self._start_vel + self._joints = np.zeros((self.n_links + 1, 2)) + self._update_joints() + self._steps = 0 + + return self._get_obs().copy() + + @abstractmethod + def step(self, action: np.ndarray): + """ + A single step with action in angular velocity space + """ + raise NotImplementedError + + def _update_joints(self): + """ + update joints to get new end-effector position. The other links are only required for rendering. + Returns: + + """ + angles = np.cumsum(self._joint_angles) + x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) + self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + + def _check_self_collision(self): + """Checks whether line segments intersect""" + + if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min): + return True + + link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1) + for i, line1 in enumerate(link_lines): + for line2 in link_lines[i + 2:, :]: + if intersect(line1[0], line1[-1], line2[0], line2[-1]): + return True + return False + + @abstractmethod + def _get_reward(self, action: np.ndarray) -> (float, dict): + pass + + @abstractmethod + def _get_obs(self) -> np.ndarray: + pass + + @abstractmethod + def _check_collisions(self) -> bool: + pass + + @abstractmethod + def _terminate(self, info) -> bool: + return False + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def close(self): + del self.fig + + @property + def end_effector(self): + return self._joints[self.n_links].T diff --git a/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py b/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py index c4afaed..4f7b6f1 100644 --- a/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py @@ -1,157 +1,35 @@ -from typing import Iterable, Union -from abc import ABCMeta, abstractmethod -import gym -import matplotlib.pyplot as plt -import numpy as np from gym import spaces -from gym.utils import seeding -from alr_envs.alr.classic_control.utils import intersect +import numpy as np +from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv -class BaseReacherEnv(gym.Env): +class BaseReacherDirectEnv(BaseReacherEnv): """ - Simple Reaching Task without any physics simulation. - Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions - towards the end of the trajectory. + Base class for directly controlled reaching environments """ - def __init__(self, n_links: int, random_start: bool = True, allow_self_collision: bool = False): - super().__init__() - self.link_lengths = np.ones(n_links) - self.n_links = n_links - self._dt = 0.01 - - self.random_start = random_start - - # state - self._joints = None - self._joint_angles = None - self._angle_velocity = None - self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) - self._start_vel = np.zeros(self.n_links) - - # joint limits - self.j_min = -np.pi * np.ones(n_links) - self.j_max = np.pi * np.ones(n_links) - - self.max_vel = 1 - self.steps_before_reward = 199 + super().__init__(n_links, random_start, allow_self_collision) + self.max_vel = 10 * np.pi action_bound = np.ones((self.n_links,)) * self.max_vel - state_bound = np.hstack([ - [np.pi] * self.n_links, # cos - [np.pi] * self.n_links, # sin - [np.inf] * self.n_links, # velocity - [np.inf] * 2, # x-y coordinates of target distance - [np.inf] # env steps, because reward start after n steps TODO: Maybe - ]) self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) - self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - - self.reward_function = None # Needs to be set in sub class - - # containers for plotting - self.metadata = {'render.modes': ["human"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self) -> Union[float, int]: - return self._dt - - @property - def current_pos(self): - return self._joint_angles.copy() - - @property - def current_vel(self): - return self._angle_velocity.copy() - - def reset(self): - # Sample only orientation of first link, i.e. the arm is always straight. - if self.random_start: - first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) - self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - - return self._get_obs().copy() def step(self, action: np.ndarray): """ A single step with action in angular velocity space """ - acc = (action - self._angle_velocity) / self.dt + self._acc = (action - self._angle_velocity) / self.dt self._angle_velocity = action self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._update_joints() self._is_collided = self._check_collisions() - # reward, info = self._get_reward(action) - reward, info = self.reward_function.get_reward(self, acc) + reward, info = self._get_reward(action) self._steps += 1 done = self._terminate(info) return self._get_obs().copy(), reward, done, info - - def _update_joints(self): - """ - update joints to get new end-effector position. The other links are only required for rendering. - Returns: - - """ - angles = np.cumsum(self._joint_angles) - x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) - self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) - - def _check_self_collision(self): - """Checks whether line segments intersect""" - - if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min): - return True - - link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1) - for i, line1 in enumerate(link_lines): - for line2 in link_lines[i + 2:, :]: - if intersect(line1[0], line1[-1], line2[0], line2[-1]): - return True - return False - - @abstractmethod - def _get_reward(self, action: np.ndarray) -> (float, dict): - pass - - @abstractmethod - def _get_obs(self) -> np.ndarray: - pass - - @abstractmethod - def _check_collisions(self) -> bool: - pass - - @abstractmethod - def _terminate(self, info) -> bool: - return False - - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - - def close(self): - del self.fig - - @property - def end_effector(self): - return self._joints[self.n_links].T diff --git a/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py b/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py index 12e5830..7789965 100644 --- a/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py @@ -1,86 +1,19 @@ -from typing import Iterable, Union -from abc import ABCMeta, abstractmethod -import gym -import matplotlib.pyplot as plt -import numpy as np from gym import spaces -from gym.utils import seeding -from alr_envs.classic_control.utils import check_self_collision +import numpy as np +from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv -class BaseReacherEnv(gym.Env): +class BaseReacherTorqueEnv(BaseReacherEnv): """ - Simple Reaching Task without any physics simulation. - Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions - towards the end of the trajectory. + Base class for torque controlled reaching environments """ - def __init__(self, n_links: int, random_start: bool = True, - allow_self_collision: bool = False, collision_penalty: float = 1000): - super().__init__() - self.link_lengths = np.ones(n_links) - self.n_links = n_links - self._dt = 0.01 - - self.random_start = random_start - - self._joints = None - self._joint_angles = None - self._angle_velocity = None - self._is_collided = False - self.allow_self_collision = allow_self_collision - self.collision_penalty = collision_penalty - self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) - self._start_vel = np.zeros(self.n_links) - - self.max_torque = 1 - self.steps_before_reward = 199 + allow_self_collision: bool = False): + super().__init__(n_links, random_start, allow_self_collision) + self.max_torque = 1000 action_bound = np.ones((self.n_links,)) * self.max_torque - state_bound = np.hstack([ - [np.pi] * self.n_links, # cos - [np.pi] * self.n_links, # sin - [np.inf] * self.n_links, # velocity - [np.inf] * 2, # x-y coordinates of target distance - [np.inf] # env steps, because reward start after n steps TODO: Maybe - ]) self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) - self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - - # containers for plotting - self.metadata = {'render.modes': ["human"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self) -> Union[float, int]: - return self._dt - - @property - def current_pos(self): - return self._joint_angles.copy() - - @property - def current_vel(self): - return self._angle_velocity.copy() - - def reset(self): - # Sample only orientation of first link, i.e. the arm is always straight. - if self.random_start: - first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) - self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - - return self._get_obs().copy() def step(self, action: np.ndarray): """ @@ -94,11 +27,6 @@ class BaseReacherEnv(gym.Env): self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._update_joints() - if not self.allow_self_collision: - self_collision = check_self_collision(line_points_in_taskspace) - if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: - self_collision = True - self._is_collided = self._check_collisions() reward, info = self._get_reward(action) @@ -107,36 +35,3 @@ class BaseReacherEnv(gym.Env): done = False return self._get_obs().copy(), reward, done, info - - def _update_joints(self): - """ - update joints to get new end-effector position. The other links are only required for rendering. - Returns: - - """ - angles = np.cumsum(self._joint_angles) - x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) - self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) - - @abstractmethod - def _get_reward(self, action: np.ndarray) -> (float, dict): - pass - - @abstractmethod - def _get_obs(self) -> np.ndarray: - pass - - @abstractmethod - def _check_collisions(self) -> bool: - pass - - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - - def close(self): - del self.fig - - @property - def end_effector(self): - return self._joints[self.n_links].T diff --git a/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py b/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py index 55aac9e..2cb1e08 100644 --- a/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py +++ b/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py @@ -3,14 +3,12 @@ from typing import Union import gym import matplotlib.pyplot as plt import numpy as np -from gym.utils import seeding from matplotlib import patches -from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherEnv -from alr_envs.alr.classic_control.utils import check_self_collision +from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv -class HoleReacherEnv(BaseReacherEnv): +class HoleReacherEnv(BaseReacherDirectEnv): def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"): @@ -28,7 +26,7 @@ class HoleReacherEnv(BaseReacherEnv): self._tmp_depth = None self._goal = None # x-y coordinates for reaching the center at the bottom of the hole - action_bound = np.pi * np.ones((self.n_links,)) + # action_bound = np.pi * np.ones((self.n_links,)) state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -38,19 +36,25 @@ class HoleReacherEnv(BaseReacherEnv): [np.inf] * 2, # x-y coordinates of target distance [np.inf] # env steps, because reward start after n steps TODO: Maybe ]) - self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + # self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) if rew_fct == "simple": - from alr_envs.alr.classic_control.hole_reacher.simple_reward import HolereacherSimpleReward - self.reward_function = HolereacherSimpleReward(allow_self_collision, allow_wall_collision, collision_penalty) + from alr_envs.alr.classic_control.hole_reacher.hr_simple_reward import HolereacherReward + self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty) + else: + raise ValueError("Unknown reward function {}".format(rew_fct)) def reset(self): self._generate_hole() self._set_patches() + self.reward_function.reset() return super().reset() + def _get_reward(self, action: np.ndarray) -> (float, dict): + return self.reward_function.get_reward(self) + def _terminate(self, info): return info["is_collided"] @@ -118,10 +122,6 @@ class HoleReacherEnv(BaseReacherEnv): end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0] end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1] - # xy = np.stack((x, y), axis=2) - # - # self._joints[0] + np.cumsum(xy, axis=0) - return np.squeeze(end_effector + self._joints[0, :]) def check_wall_collision(self): @@ -157,36 +157,6 @@ class HoleReacherEnv(BaseReacherEnv): return False - # def check_wall_collision(self, ): - # """find the intersection of line segments A=(x1,y1)/(x2,y2) and - # B=(x3,y3)/(x4,y4). """ - # - # link_lines = np.hstack((self._joints[:-1, :], self._joints[1:, :])) - # - # all_points_product = np.hstack( - # [np.repeat(link_lines, len(self.ground_lines), axis=0), - # np.tile(self.ground_lines, (len(link_lines), 1))]) - # - # x1, y1, x2, y2, x3, y3, x4, y4 = all_points_product.T - # - # denom = ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)) - # # if denom == 0: - # # return False - # px = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denom - # py = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denom - # # if (px - x1) * (px - x2) < 0 and (py - y1) * (py - y2) < 0 \ - # # and (px - x3) * (px - x4) < 0 and (py - y3) * (py - y4) < 0: - # # return True # [px, py] - # test = ((px - x1) * (px - x2) <= 0) & ((py - y1) * (py - y2) <= 0) & ((px - x3) * (px - x4) <= 0) & ( - # (py - y3) * (py - y4) <= 0) - # if np.any(test): - # possible_collisions = np.stack((px, py)).T[test] - # for row in possible_collisions: - # if not np.any([np.allclose(row, x) for x in self._joints]): - # return True, row - # - # return False, None - def render(self, mode='human'): if self.fig is None: # Create base figure once on the beginning. Afterwards only update @@ -242,322 +212,14 @@ class HoleReacherEnv(BaseReacherEnv): self.fig.gca().add_patch(hole_floor) -class HoleReacherEnvOld(gym.Env): - - def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, - hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, - allow_wall_collision: bool = False, collision_penalty: float = 1000): - - self.n_links = n_links - self.link_lengths = np.ones((n_links, 1)) - - self.random_start = random_start - - # provided initial parameters - self.initial_x = hole_x # x-position of center of hole - self.initial_width = hole_width # width of hole - self.initial_depth = hole_depth # depth of hole - - # temp container for current env state - self._tmp_x = None - self._tmp_width = None - self._tmp_depth = None - self._goal = None # x-y coordinates for reaching the center at the bottom of the hole - - # collision - self.allow_self_collision = allow_self_collision - self.allow_wall_collision = allow_wall_collision - self.collision_penalty = collision_penalty - - # state - self._joints = None - self._joint_angles = None - self._angle_velocity = None - self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) - self._start_vel = np.zeros(self.n_links) - - self._dt = 0.01 - - action_bound = np.pi * np.ones((self.n_links,)) - state_bound = np.hstack([ - [np.pi] * self.n_links, # cos - [np.pi] * self.n_links, # sin - [np.inf] * self.n_links, # velocity - [np.inf], # hole width - # [np.inf], # hole depth - [np.inf] * 2, # x-y coordinates of target distance - [np.inf] # env steps, because reward start after n steps TODO: Maybe - ]) - self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) - self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - - # containers for plotting - self.metadata = {'render.modes': ["human", "partial"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self) -> Union[float, int]: - return self._dt - - # @property - # def start_pos(self): - # return self._start_pos - - @property - def current_pos(self): - return self._joint_angles.copy() - - @property - def current_vel(self): - return self._angle_velocity.copy() - - def step(self, action: np.ndarray): - """ - A single step with an action in joint velocity space - """ - - acc = (action - self._angle_velocity) / self.dt - self._angle_velocity = action - self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5) - self._update_joints() - - reward, info = self._get_reward(acc) - - info.update({"is_collided": self._is_collided}) - self.end_effector_traj.append(np.copy(self.end_effector)) - - self._steps += 1 - done = self._is_collided - - return self._get_obs().copy(), reward, done, info - - def reset(self): - if self.random_start: - # Maybe change more than first seed - first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) - self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - - self._generate_hole() - self._set_patches() - - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - self.end_effector_traj = [] - - return self._get_obs().copy() - - def _generate_hole(self): - if self.initial_width is None: - width = self.np_random.uniform(0.15, 0.5) - else: - width = np.copy(self.initial_width) - if self.initial_x is None: - # sample whole on left or right side - direction = self.np_random.choice([-1, 1]) - # Hole center needs to be half the width away from the arm to give a valid setting. - x = direction * self.np_random.uniform(width / 2, 3.5) - else: - x = np.copy(self.initial_x) - if self.initial_depth is None: - # TODO we do not want this right now. - depth = self.np_random.uniform(1, 1) - else: - depth = np.copy(self.initial_depth) - - self._tmp_width = width - self._tmp_x = x - self._tmp_depth = depth - self._goal = np.hstack([self._tmp_x, -self._tmp_depth]) - - def _update_joints(self): - """ - update _joints to get new end effector position. The other links are only required for rendering. - Returns: - - """ - line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20) - - self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] - self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] - - self_collision = False - wall_collision = False - - if not self.allow_self_collision: - self_collision = check_self_collision(line_points_in_taskspace) - if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: - self_collision = True - - if not self.allow_wall_collision: - wall_collision = self._check_wall_collision(line_points_in_taskspace) - - self._is_collided = self_collision or wall_collision - - def _get_reward(self, acc: np.ndarray): - reward = 0 - # success = False - - if self._steps == 199 or self._is_collided: - # return reward only in last time step - # Episode also terminates when colliding, hence return reward - dist = np.linalg.norm(self.end_effector - self._goal) - # success = dist < 0.005 and not self._is_collided - reward = - dist ** 2 - self.collision_penalty * self._is_collided - - reward -= 5e-8 * np.sum(acc ** 2) - # info = {"is_success": success} - - return reward, {} # info - - def _get_obs(self): - theta = self._joint_angles - return np.hstack([ - np.cos(theta), - np.sin(theta), - self._angle_velocity, - self._tmp_width, - # self._tmp_hole_depth, - self.end_effector - self._goal, - self._steps - ]) - - def _get_forward_kinematics(self, num_points_per_link=1): - theta = self._joint_angles[:, None] - - intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 - accumulated_theta = np.cumsum(theta, axis=0) - end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) - - x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points - y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points - - end_effector[0, :, 0] = x[0, :] - end_effector[0, :, 1] = y[0, :] - - for i in range(1, self.n_links): - end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0] - end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1] - - return np.squeeze(end_effector + self._joints[0, :]) - - def _check_wall_collision(self, line_points): - # all points that are before the hole in x - r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2)) - - # check if any of those points are below surface - nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0) - - if nr_line_points_below_surface_before_hole > 0: - return True - - # all points that are after the hole in x - r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2)) - - # check if any of those points are below surface - nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0) - - if nr_line_points_below_surface_after_hole > 0: - return True - - # all points that are above the hole - r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & ( - line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2))) - - # check if any of those points are below surface - nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth) - - if nr_line_points_below_surface_in_hole > 0: - return True - - return False - - def render(self, mode='human'): - if self.fig is None: - # Create base figure once on the beginning. Afterwards only update - plt.ion() - self.fig = plt.figure() - ax = self.fig.add_subplot(1, 1, 1) - - # limits - lim = np.sum(self.link_lengths) + 0.5 - ax.set_xlim([-lim, lim]) - ax.set_ylim([-1.1, lim]) - - self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') - self._set_patches() - self.fig.show() - - self.fig.gca().set_title( - f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}") - - if mode == "human": - - # arm - self.line.set_data(self._joints[:, 0], self._joints[:, 1]) - - self.fig.canvas.draw() - self.fig.canvas.flush_events() - - elif mode == "partial": - if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided: - # Arm - plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k', - alpha=self._steps / 200) - - def _set_patches(self): - if self.fig is not None: - self.fig.gca().patches = [] - left_block = patches.Rectangle((-self.n_links, -self._tmp_depth), - self.n_links + self._tmp_x - self._tmp_width / 2, - self._tmp_depth, - fill=True, edgecolor='k', facecolor='k') - right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth), - self.n_links - self._tmp_x + self._tmp_width / 2, - self._tmp_depth, - fill=True, edgecolor='k', facecolor='k') - hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth), - self._tmp_width, - 1 - self._tmp_depth, - fill=True, edgecolor='k', facecolor='k') - - # Add the patch to the Axes - self.fig.gca().add_patch(left_block) - self.fig.gca().add_patch(right_block) - self.fig.gca().add_patch(hole_floor) - - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - - @property - def end_effector(self): - return self._joints[self.n_links].T - - def close(self): - super().close() - if self.fig is not None: - plt.close(self.fig) - - if __name__ == "__main__": import time env = HoleReacherEnv(5) env.reset() - start = time.time() for i in range(10000): - # env.check_wall_collision() ac = env.action_space.sample() obs, rew, done, info = env.step(ac) # env.render() if done: env.reset() - print(time.time() - start) diff --git a/alr_envs/alr/classic_control/hole_reacher/hole_reacher_old.py b/alr_envs/alr/classic_control/hole_reacher/hole_reacher_old.py new file mode 100644 index 0000000..1b18bce --- /dev/null +++ b/alr_envs/alr/classic_control/hole_reacher/hole_reacher_old.py @@ -0,0 +1,315 @@ +from typing import Union + +import gym +import matplotlib.pyplot as plt +import numpy as np +from gym.utils import seeding +from matplotlib import patches + +from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv +from alr_envs.alr.classic_control.utils import check_self_collision + + +class HoleReacherEnvOld(gym.Env): + + def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, + hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, + allow_wall_collision: bool = False, collision_penalty: float = 1000): + + self.n_links = n_links + self.link_lengths = np.ones((n_links, 1)) + + self.random_start = random_start + + # provided initial parameters + self.initial_x = hole_x # x-position of center of hole + self.initial_width = hole_width # width of hole + self.initial_depth = hole_depth # depth of hole + + # temp container for current env state + self._tmp_x = None + self._tmp_width = None + self._tmp_depth = None + self._goal = None # x-y coordinates for reaching the center at the bottom of the hole + + # collision + self.allow_self_collision = allow_self_collision + self.allow_wall_collision = allow_wall_collision + self.collision_penalty = collision_penalty + + # state + self._joints = None + self._joint_angles = None + self._angle_velocity = None + self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) + self._start_vel = np.zeros(self.n_links) + + self._dt = 0.01 + + action_bound = np.pi * np.ones((self.n_links,)) + state_bound = np.hstack([ + [np.pi] * self.n_links, # cos + [np.pi] * self.n_links, # sin + [np.inf] * self.n_links, # velocity + [np.inf], # hole width + # [np.inf], # hole depth + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + # containers for plotting + self.metadata = {'render.modes': ["human", "partial"]} + self.fig = None + + self._steps = 0 + self.seed() + + @property + def dt(self) -> Union[float, int]: + return self._dt + + # @property + # def start_pos(self): + # return self._start_pos + + @property + def current_pos(self): + return self._joint_angles.copy() + + @property + def current_vel(self): + return self._angle_velocity.copy() + + def step(self, action: np.ndarray): + """ + A single step with an action in joint velocity space + """ + + acc = (action - self._angle_velocity) / self.dt + self._angle_velocity = action + self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5) + self._update_joints() + + reward, info = self._get_reward(acc) + + info.update({"is_collided": self._is_collided}) + self.end_effector_traj.append(np.copy(self.end_effector)) + + self._steps += 1 + done = self._is_collided + + return self._get_obs().copy(), reward, done, info + + def reset(self): + if self.random_start: + # Maybe change more than first seed + first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) + self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) + self._start_pos = self._joint_angles.copy() + else: + self._joint_angles = self._start_pos + + self._generate_hole() + self._set_patches() + + self._angle_velocity = self._start_vel + self._joints = np.zeros((self.n_links + 1, 2)) + self._update_joints() + self._steps = 0 + self.end_effector_traj = [] + + return self._get_obs().copy() + + def _generate_hole(self): + if self.initial_width is None: + width = self.np_random.uniform(0.15, 0.5) + else: + width = np.copy(self.initial_width) + if self.initial_x is None: + # sample whole on left or right side + direction = self.np_random.choice([-1, 1]) + # Hole center needs to be half the width away from the arm to give a valid setting. + x = direction * self.np_random.uniform(width / 2, 3.5) + else: + x = np.copy(self.initial_x) + if self.initial_depth is None: + # TODO we do not want this right now. + depth = self.np_random.uniform(1, 1) + else: + depth = np.copy(self.initial_depth) + + self._tmp_width = width + self._tmp_x = x + self._tmp_depth = depth + self._goal = np.hstack([self._tmp_x, -self._tmp_depth]) + + def _update_joints(self): + """ + update _joints to get new end effector position. The other links are only required for rendering. + Returns: + + """ + line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20) + + self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] + self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] + + self_collision = False + wall_collision = False + + if not self.allow_self_collision: + self_collision = check_self_collision(line_points_in_taskspace) + if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: + self_collision = True + + if not self.allow_wall_collision: + wall_collision = self._check_wall_collision(line_points_in_taskspace) + + self._is_collided = self_collision or wall_collision + + def _get_reward(self, acc: np.ndarray): + reward = 0 + # success = False + + if self._steps == 199 or self._is_collided: + # return reward only in last time step + # Episode also terminates when colliding, hence return reward + dist = np.linalg.norm(self.end_effector - self._goal) + # success = dist < 0.005 and not self._is_collided + reward = - dist ** 2 - self.collision_penalty * self._is_collided + + reward -= 5e-8 * np.sum(acc ** 2) + # info = {"is_success": success} + + return reward, {} # info + + def _get_obs(self): + theta = self._joint_angles + return np.hstack([ + np.cos(theta), + np.sin(theta), + self._angle_velocity, + self._tmp_width, + # self._tmp_hole_depth, + self.end_effector - self._goal, + self._steps + ]) + + def _get_forward_kinematics(self, num_points_per_link=1): + theta = self._joint_angles[:, None] + + intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 + accumulated_theta = np.cumsum(theta, axis=0) + end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) + + x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points + y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points + + end_effector[0, :, 0] = x[0, :] + end_effector[0, :, 1] = y[0, :] + + for i in range(1, self.n_links): + end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0] + end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1] + + return np.squeeze(end_effector + self._joints[0, :]) + + def _check_wall_collision(self, line_points): + # all points that are before the hole in x + r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2)) + + # check if any of those points are below surface + nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0) + + if nr_line_points_below_surface_before_hole > 0: + return True + + # all points that are after the hole in x + r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2)) + + # check if any of those points are below surface + nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0) + + if nr_line_points_below_surface_after_hole > 0: + return True + + # all points that are above the hole + r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & ( + line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2))) + + # check if any of those points are below surface + nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth) + + if nr_line_points_below_surface_in_hole > 0: + return True + + return False + + def render(self, mode='human'): + if self.fig is None: + # Create base figure once on the beginning. Afterwards only update + plt.ion() + self.fig = plt.figure() + ax = self.fig.add_subplot(1, 1, 1) + + # limits + lim = np.sum(self.link_lengths) + 0.5 + ax.set_xlim([-lim, lim]) + ax.set_ylim([-1.1, lim]) + + self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') + self._set_patches() + self.fig.show() + + self.fig.gca().set_title( + f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}") + + if mode == "human": + + # arm + self.line.set_data(self._joints[:, 0], self._joints[:, 1]) + + self.fig.canvas.draw() + self.fig.canvas.flush_events() + + elif mode == "partial": + if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided: + # Arm + plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k', + alpha=self._steps / 200) + + def _set_patches(self): + if self.fig is not None: + self.fig.gca().patches = [] + left_block = patches.Rectangle((-self.n_links, -self._tmp_depth), + self.n_links + self._tmp_x - self._tmp_width / 2, + self._tmp_depth, + fill=True, edgecolor='k', facecolor='k') + right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth), + self.n_links - self._tmp_x + self._tmp_width / 2, + self._tmp_depth, + fill=True, edgecolor='k', facecolor='k') + hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth), + self._tmp_width, + 1 - self._tmp_depth, + fill=True, edgecolor='k', facecolor='k') + + # Add the patch to the Axes + self.fig.gca().add_patch(left_block) + self.fig.gca().add_patch(right_block) + self.fig.gca().add_patch(hole_floor) + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + @property + def end_effector(self): + return self._joints[self.n_links].T + + def close(self): + super().close() + if self.fig is not None: + plt.close(self.fig) \ No newline at end of file diff --git a/alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py b/alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py new file mode 100644 index 0000000..7fadad9 --- /dev/null +++ b/alr_envs/alr/classic_control/hole_reacher/hr_dist_vel_acc_reward.py @@ -0,0 +1,56 @@ +import numpy as np + + +class HolereacherReward: + def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty): + self.collision_penalty = collision_penalty + + # collision + self.allow_self_collision = allow_self_collision + self.allow_wall_collision = allow_wall_collision + self.collision_penalty = collision_penalty + self._is_collided = False + + self.reward_factors = np.array((-1, -1e-4, -1e-6, -collision_penalty, -1)) + + def reset(self): + self._is_collided = False + + def get_reward(self, env): + dist_cost = 0 + collision_cost = 0 + time_cost = 0 + success = False + + self_collision = False + wall_collision = False + + if not self.allow_self_collision: + self_collision = env._check_self_collision() + + if not self.allow_wall_collision: + wall_collision = env.check_wall_collision() + + self._is_collided = self_collision or wall_collision + + if env._steps == 199 or self._is_collided: + # return reward only in last time step + # Episode also terminates when colliding, hence return reward + dist = np.linalg.norm(env.end_effector - env._goal) + + success = dist < 0.005 and not self._is_collided + dist_cost = int(not self._is_collided) * dist ** 2 + collision_cost = self._is_collided * dist ** 2 + time_cost = 199 - env._steps + + info = {"is_success": success, + "is_collided": self._is_collided, + "end_effector": np.copy(env.end_effector)} + + vel_cost = np.sum(env._angle_velocity ** 2) + acc_cost = np.sum(env._acc ** 2) + + reward_features = np.array((dist_cost, vel_cost, acc_cost, collision_cost, time_cost)) + reward = np.dot(reward_features, self.reward_factors) + + return reward, info diff --git a/alr_envs/alr/classic_control/hole_reacher/simple_reward.py b/alr_envs/alr/classic_control/hole_reacher/hr_simple_reward.py similarity index 64% rename from alr_envs/alr/classic_control/hole_reacher/simple_reward.py rename to alr_envs/alr/classic_control/hole_reacher/hr_simple_reward.py index 0609a62..5820ab1 100644 --- a/alr_envs/alr/classic_control/hole_reacher/simple_reward.py +++ b/alr_envs/alr/classic_control/hole_reacher/hr_simple_reward.py @@ -1,8 +1,7 @@ import numpy as np -from alr_envs.alr.classic_control.utils import check_self_collision -class HolereacherSimpleReward: +class HolereacherReward: def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty): self.collision_penalty = collision_penalty @@ -11,16 +10,20 @@ class HolereacherSimpleReward: self.allow_wall_collision = allow_wall_collision self.collision_penalty = collision_penalty self._is_collided = False - pass - def get_reward(self, env, action): - reward = 0 + self.reward_factors = np.array((-1, -5e-8, -collision_penalty)) + + def reset(self): + self._is_collided = False + + def get_reward(self, env): + dist_cost = 0 + collision_cost = 0 success = False self_collision = False wall_collision = False - # joints = np.hstack((env._joints[:-1, :], env._joints[1:, :])) if not self.allow_self_collision: self_collision = env._check_self_collision() @@ -33,16 +36,18 @@ class HolereacherSimpleReward: # return reward only in last time step # Episode also terminates when colliding, hence return reward dist = np.linalg.norm(env.end_effector - env._goal) + dist_cost = dist ** 2 + collision_cost = int(self._is_collided) success = dist < 0.005 and not self._is_collided - reward = - dist ** 2 - self.collision_penalty * self._is_collided info = {"is_success": success, - "is_collided": self._is_collided} + "is_collided": self._is_collided, + "end_effector": np.copy(env.end_effector)} - acc = (action - env._angle_velocity) / env.dt - reward -= 5e-8 * np.sum(acc ** 2) + acc_cost = np.sum(env._acc ** 2) + + reward_features = np.array((dist_cost, acc_cost, collision_cost)) + reward = np.dot(reward_features, self.reward_factors) return reward, info - - diff --git a/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py b/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py index 08c151f..40a4d95 100644 --- a/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py +++ b/alr_envs/alr/classic_control/simple_reacher/simple_reacher.py @@ -1,26 +1,22 @@ from typing import Iterable, Union -import gym import matplotlib.pyplot as plt import numpy as np from gym import spaces -from gym.utils import seeding + +from alr_envs.alr.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv -class SimpleReacherEnv(gym.Env): +class SimpleReacherEnv(BaseReacherTorqueEnv): """ Simple Reaching Task without any physics simulation. Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions towards the end of the trajectory. """ - def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True): - super().__init__() - self.link_lengths = np.ones(n_links) - self.n_links = n_links - self._dt = 0.1 - - self.random_start = random_start + def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True, + allow_self_collision: bool = False,): + super().__init__(n_links, random_start, allow_self_collision) # provided initial parameters self.inital_target = target @@ -28,16 +24,10 @@ class SimpleReacherEnv(gym.Env): # temp container for current env state self._goal = None - self._joints = None - self._joint_angles = None - self._angle_velocity = None self._start_pos = np.zeros(self.n_links) - self._start_vel = np.zeros(self.n_links) - self.max_torque = 1 self.steps_before_reward = 199 - action_bound = np.ones((self.n_links,)) * self.max_torque state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -45,84 +35,24 @@ class SimpleReacherEnv(gym.Env): [np.inf] * 2, # x-y coordinates of target distance [np.inf] # env steps, because reward start after n steps TODO: Maybe ]) - self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - # containers for plotting - self.metadata = {'render.modes': ["human"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self) -> Union[float, int]: - return self._dt - # @property # def start_pos(self): # return self._start_pos - @property - def current_pos(self): - return self._joint_angles - - @property - def current_vel(self): - return self._angle_velocity - - def step(self, action: np.ndarray): - """ - A single step with action in torque space - """ - - # action = self._add_action_noise(action) - ac = np.clip(action, -self.max_torque, self.max_torque) - - self._angle_velocity = self._angle_velocity + self.dt * ac - self._joint_angles = self._joint_angles + self.dt * self._angle_velocity - self._update_joints() - - reward, info = self._get_reward(action) - - self._steps += 1 - done = False - - return self._get_obs().copy(), reward, done, info - def reset(self): - - # TODO: maybe do initialisation more random? - # Sample only orientation of first link, i.e. the arm is always straight. - if self.random_start: - self._joint_angles = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - self._generate_goal() - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - - return self._get_obs().copy() - - def _update_joints(self): - """ - update joints to get new end-effector position. The other links are only required for rendering. - Returns: - - """ - angles = np.cumsum(self._joint_angles) - x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) - self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + return super().reset() def _get_reward(self, action: np.ndarray): diff = self.end_effector - self._goal reward_dist = 0 + if not self.allow_self_collision: + self._is_collided = self._check_self_collision() + if self._steps >= self.steps_before_reward: reward_dist -= np.linalg.norm(diff) # reward_dist = np.exp(-0.1 * diff ** 2).mean() @@ -132,6 +62,9 @@ class SimpleReacherEnv(gym.Env): reward = reward_dist - reward_ctrl return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl) + def _terminate(self, info): + return False + def _get_obs(self): theta = self._joint_angles return np.hstack([ @@ -190,13 +123,14 @@ class SimpleReacherEnv(gym.Env): self.fig.canvas.draw() self.fig.canvas.flush_events() - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - def close(self): - del self.fig +if __name__ == "__main__": + env = SimpleReacherEnv(5) + env.reset() + for i in range(200): + ac = env.action_space.sample() + obs, rew, done, info = env.step(ac) - @property - def end_effector(self): - return self._joints[self.n_links].T + env.render() + if done: + break diff --git a/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py b/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py index 95d1e7f..3b47969 100644 --- a/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py +++ b/alr_envs/alr/classic_control/viapoint_reacher/viapoint_reacher.py @@ -6,17 +6,15 @@ import numpy as np from gym.utils import seeding from alr_envs.alr.classic_control.utils import check_self_collision +from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv -class ViaPointReacherEnv(gym.Env): +class ViaPointReacherEnv(BaseReacherDirectEnv): def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None, target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000): - self.n_links = n_links - self.link_lengths = np.ones((n_links, 1)) - - self.random_start = random_start + super().__init__(n_links, random_start, allow_self_collision) # provided initial parameters self.intitial_target = target # provided target value @@ -30,17 +28,6 @@ class ViaPointReacherEnv(gym.Env): self.allow_self_collision = allow_self_collision self.collision_penalty = collision_penalty - # state - self._joints = None - self._joint_angles = None - self._angle_velocity = None - self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) - self._start_vel = np.zeros(self.n_links) - self.weight_matrix_scale = 1 - - self._dt = 0.01 - - action_bound = np.pi * np.ones((self.n_links,)) state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -49,69 +36,15 @@ class ViaPointReacherEnv(gym.Env): [np.inf] * 2, # x-y coordinates of target distance [np.inf] # env steps, because reward start after n steps ]) - self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) - # containers for plotting - self.metadata = {'render.modes': ["human", "partial"]} - self.fig = None - - self._steps = 0 - self.seed() - - @property - def dt(self): - return self._dt - # @property # def start_pos(self): # return self._start_pos - @property - def current_pos(self): - return self._joint_angles.copy() - - @property - def current_vel(self): - return self._angle_velocity.copy() - - def step(self, action: np.ndarray): - """ - a single step with an action in joint velocity space - """ - vel = action - self._angle_velocity = vel - self._joint_angles = self._joint_angles + self.dt * self._angle_velocity - self._update_joints() - - acc = (vel - self._angle_velocity) / self.dt - reward, info = self._get_reward(acc) - - info.update({"is_collided": self._is_collided}) - - self._steps += 1 - done = self._is_collided - - return self._get_obs().copy(), reward, done, info - def reset(self): - - if self.random_start: - # Maybe change more than dirst seed - first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) - self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) - self._start_pos = self._joint_angles.copy() - else: - self._joint_angles = self._start_pos - self._generate_goal() - - self._angle_velocity = self._start_vel - self._joints = np.zeros((self.n_links + 1, 2)) - self._update_joints() - self._steps = 0 - - return self._get_obs().copy() + return super().reset() def _generate_goal(self): # TODO: Maybe improve this later, this can yield quite a lot of invalid settings @@ -137,29 +70,13 @@ class ViaPointReacherEnv(gym.Env): self._via_point = via_target self._goal = goal - def _update_joints(self): - """ - update _joints to get new end effector position. The other links are only required for rendering. - Returns: - - """ - line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20) - - self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] - self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] - - self_collision = False - - if not self.allow_self_collision: - self_collision = check_self_collision(line_points_in_taskspace) - if np.any(np.abs(self._joint_angles) > np.pi): - self_collision = True - - self._is_collided = self_collision - def _get_reward(self, acc): success = False reward = -np.inf + + if not self.allow_self_collision: + self._is_collided = self._check_self_collision() + if not self._is_collided: dist = np.inf # return intermediate reward for via point @@ -177,10 +94,15 @@ class ViaPointReacherEnv(gym.Env): reward -= dist ** 2 reward -= 5e-8 * np.sum(acc ** 2) - info = {"is_success": success} + info = {"is_success": success, + "is_collided": self._is_collided, + "end_effector": np.copy(env.end_effector)} return reward, info + def _terminate(self, info): + return info["is_collided"] + def _get_obs(self): theta = self._joint_angles return np.hstack([ @@ -192,27 +114,6 @@ class ViaPointReacherEnv(gym.Env): self._steps ]) - def get_forward_kinematics(self, num_points_per_link=1): - theta = self._joint_angles[:, None] - - intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 - - accumulated_theta = np.cumsum(theta, axis=0) - - endeffector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) - - x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points - y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points - - endeffector[0, :, 0] = x[0, :] - endeffector[0, :, 1] = y[0, :] - - for i in range(1, self.n_links): - endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0] - endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1] - - return np.squeeze(endeffector + self._joints[0, :]) - def render(self, mode='human'): goal_pos = self._goal.T via_pos = self._via_point.T @@ -281,14 +182,14 @@ class ViaPointReacherEnv(gym.Env): plt.pause(0.01) - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] +if __name__ == "__main__": + import time + env = ViaPointReacherEnv(5) + env.reset() - @property - def end_effector(self): - return self._joints[self.n_links].T - - def close(self): - if self.fig is not None: - plt.close(self.fig) + for i in range(10000): + ac = env.action_space.sample() + obs, rew, done, info = env.step(ac) + env.render() + if done: + env.reset() diff --git a/alr_envs/examples/examples_motion_primitives.py b/alr_envs/examples/examples_motion_primitives.py index b307a52..e84da2f 100644 --- a/alr_envs/examples/examples_motion_primitives.py +++ b/alr_envs/examples/examples_motion_primitives.py @@ -149,16 +149,16 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True): if __name__ == '__main__': render = False # DMP - # example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render) + example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render) # ProMP - # example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render) + example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render) # DetProMP example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=100, render=render) # Altered basis functions - # example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) + example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) # Custom MP - # example_fully_custom_mp(seed=10, iterations=1, render=render) + example_fully_custom_mp(seed=10, iterations=1, render=render)