From 95560c2bab13127842b1e5999a27972b012f19d3 Mon Sep 17 00:00:00 2001 From: Maximilian Huettenrauch Date: Wed, 17 Nov 2021 17:14:13 +0100 Subject: [PATCH] wip --- alr_envs/alr/__init__.py | 2 +- alr_envs/alr/classic_control/__init__.py | 2 +- .../classic_control/base_reacher/__init__.py | 0 .../base_reacher/base_reacher_direct.py | 53 ++-- .../base_reacher/base_reacher_torque.py | 0 .../hole_reacher/hole_reacher.py | 251 +++++++++++++++++- .../hole_reacher/mp_wrapper.py | 8 + .../hole_reacher/simple_reward.py | 48 ++++ alr_envs/alr/classic_control/utils.py | 3 + .../examples/examples_motion_primitives.py | 12 +- 10 files changed, 351 insertions(+), 28 deletions(-) rename alr_envs/{ => alr}/classic_control/base_reacher/__init__.py (100%) rename alr_envs/{ => alr}/classic_control/base_reacher/base_reacher_direct.py (73%) rename alr_envs/{ => alr}/classic_control/base_reacher/base_reacher_torque.py (100%) create mode 100644 alr_envs/alr/classic_control/hole_reacher/simple_reward.py diff --git a/alr_envs/alr/__init__.py b/alr_envs/alr/__init__.py index 627625d..565cbca 100644 --- a/alr_envs/alr/__init__.py +++ b/alr_envs/alr/__init__.py @@ -84,7 +84,7 @@ register( register( id='HoleReacher-v1', - entry_point='alr_envs.alr.classic_control:HoleReacherEnv', + entry_point='alr_envs.alr.classic_control:HoleReacherEnvOld', max_episode_steps=200, kwargs={ "n_links": 5, diff --git a/alr_envs/alr/classic_control/__init__.py b/alr_envs/alr/classic_control/__init__.py index 73575ab..38f0faf 100644 --- a/alr_envs/alr/classic_control/__init__.py +++ b/alr_envs/alr/classic_control/__init__.py @@ -1,3 +1,3 @@ -from .hole_reacher.hole_reacher import HoleReacherEnv +from .hole_reacher.hole_reacher import HoleReacherEnv, HoleReacherEnvOld from .simple_reacher.simple_reacher import SimpleReacherEnv from .viapoint_reacher.viapoint_reacher import ViaPointReacherEnv diff --git a/alr_envs/classic_control/base_reacher/__init__.py b/alr_envs/alr/classic_control/base_reacher/__init__.py similarity index 100% rename from alr_envs/classic_control/base_reacher/__init__.py rename to alr_envs/alr/classic_control/base_reacher/__init__.py diff --git a/alr_envs/classic_control/base_reacher/base_reacher_direct.py b/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py similarity index 73% rename from alr_envs/classic_control/base_reacher/base_reacher_direct.py rename to alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py index 12e5830..c4afaed 100644 --- a/alr_envs/classic_control/base_reacher/base_reacher_direct.py +++ b/alr_envs/alr/classic_control/base_reacher/base_reacher_direct.py @@ -5,7 +5,7 @@ import matplotlib.pyplot as plt import numpy as np from gym import spaces from gym.utils import seeding -from alr_envs.classic_control.utils import check_self_collision +from alr_envs.alr.classic_control.utils import intersect class BaseReacherEnv(gym.Env): @@ -16,7 +16,7 @@ class BaseReacherEnv(gym.Env): """ def __init__(self, n_links: int, random_start: bool = True, - allow_self_collision: bool = False, collision_penalty: float = 1000): + allow_self_collision: bool = False): super().__init__() self.link_lengths = np.ones(n_links) self.n_links = n_links @@ -24,19 +24,21 @@ class BaseReacherEnv(gym.Env): self.random_start = random_start + # state self._joints = None self._joint_angles = None self._angle_velocity = None - self._is_collided = False - self.allow_self_collision = allow_self_collision - self.collision_penalty = collision_penalty self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) self._start_vel = np.zeros(self.n_links) - self.max_torque = 1 + # joint limits + self.j_min = -np.pi * np.ones(n_links) + self.j_max = np.pi * np.ones(n_links) + + self.max_vel = 1 self.steps_before_reward = 199 - action_bound = np.ones((self.n_links,)) * self.max_torque + action_bound = np.ones((self.n_links,)) * self.max_vel state_bound = np.hstack([ [np.pi] * self.n_links, # cos [np.pi] * self.n_links, # sin @@ -47,6 +49,8 @@ class BaseReacherEnv(gym.Env): self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + self.reward_function = None # Needs to be set in sub class + # containers for plotting self.metadata = {'render.modes': ["human"]} self.fig = None @@ -84,27 +88,21 @@ class BaseReacherEnv(gym.Env): def step(self, action: np.ndarray): """ - A single step with action in torque space + A single step with action in angular velocity space """ - # action = self._add_action_noise(action) - ac = np.clip(action, -self.max_torque, self.max_torque) - - self._angle_velocity = self._angle_velocity + self.dt * ac + acc = (action - self._angle_velocity) / self.dt + self._angle_velocity = action self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._update_joints() - if not self.allow_self_collision: - self_collision = check_self_collision(line_points_in_taskspace) - if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: - self_collision = True - self._is_collided = self._check_collisions() - reward, info = self._get_reward(action) + # reward, info = self._get_reward(action) + reward, info = self.reward_function.get_reward(self, acc) self._steps += 1 - done = False + done = self._terminate(info) return self._get_obs().copy(), reward, done, info @@ -118,6 +116,19 @@ class BaseReacherEnv(gym.Env): x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + def _check_self_collision(self): + """Checks whether line segments intersect""" + + if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min): + return True + + link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1) + for i, line1 in enumerate(link_lines): + for line2 in link_lines[i + 2:, :]: + if intersect(line1[0], line1[-1], line2[0], line2[-1]): + return True + return False + @abstractmethod def _get_reward(self, action: np.ndarray) -> (float, dict): pass @@ -130,6 +141,10 @@ class BaseReacherEnv(gym.Env): def _check_collisions(self) -> bool: pass + @abstractmethod + def _terminate(self, info) -> bool: + return False + def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed] diff --git a/alr_envs/classic_control/base_reacher/base_reacher_torque.py b/alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py similarity index 100% rename from alr_envs/classic_control/base_reacher/base_reacher_torque.py rename to alr_envs/alr/classic_control/base_reacher/base_reacher_torque.py diff --git a/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py b/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py index ee1a997..55aac9e 100644 --- a/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py +++ b/alr_envs/alr/classic_control/hole_reacher/hole_reacher.py @@ -6,10 +6,243 @@ import numpy as np from gym.utils import seeding from matplotlib import patches +from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherEnv from alr_envs.alr.classic_control.utils import check_self_collision -class HoleReacherEnv(gym.Env): +class HoleReacherEnv(BaseReacherEnv): + def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, + hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, + allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"): + + super().__init__(n_links, random_start, allow_self_collision) + + # provided initial parameters + self.initial_x = hole_x # x-position of center of hole + self.initial_width = hole_width # width of hole + self.initial_depth = hole_depth # depth of hole + + # temp container for current env state + self._tmp_x = None + self._tmp_width = None + self._tmp_depth = None + self._goal = None # x-y coordinates for reaching the center at the bottom of the hole + + action_bound = np.pi * np.ones((self.n_links,)) + state_bound = np.hstack([ + [np.pi] * self.n_links, # cos + [np.pi] * self.n_links, # sin + [np.inf] * self.n_links, # velocity + [np.inf], # hole width + # [np.inf], # hole depth + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + if rew_fct == "simple": + from alr_envs.alr.classic_control.hole_reacher.simple_reward import HolereacherSimpleReward + self.reward_function = HolereacherSimpleReward(allow_self_collision, allow_wall_collision, collision_penalty) + + def reset(self): + self._generate_hole() + self._set_patches() + + return super().reset() + + def _terminate(self, info): + return info["is_collided"] + + def _generate_hole(self): + if self.initial_width is None: + width = self.np_random.uniform(0.15, 0.5) + else: + width = np.copy(self.initial_width) + if self.initial_x is None: + # sample whole on left or right side + direction = self.np_random.choice([-1, 1]) + # Hole center needs to be half the width away from the arm to give a valid setting. + x = direction * self.np_random.uniform(width / 2, 3.5) + else: + x = np.copy(self.initial_x) + if self.initial_depth is None: + # TODO we do not want this right now. + depth = self.np_random.uniform(1, 1) + else: + depth = np.copy(self.initial_depth) + + self._tmp_width = width + self._tmp_x = x + self._tmp_depth = depth + self._goal = np.hstack([self._tmp_x, -self._tmp_depth]) + + self._line_ground_left = np.array([-self.n_links, 0, x - width / 2, 0]) + self._line_ground_right = np.array([x + width / 2, 0, self.n_links, 0]) + self._line_ground_hole = np.array([x - width / 2, -depth, x + width / 2, -depth]) + self._line_hole_left = np.array([x - width / 2, -depth, x - width / 2, 0]) + self._line_hole_right = np.array([x + width / 2, -depth, x + width / 2, 0]) + + self.ground_lines = np.stack((self._line_ground_left, + self._line_ground_right, + self._line_ground_hole, + self._line_hole_left, + self._line_hole_right)) + + def _get_obs(self): + theta = self._joint_angles + return np.hstack([ + np.cos(theta), + np.sin(theta), + self._angle_velocity, + self._tmp_width, + # self._tmp_hole_depth, + self.end_effector - self._goal, + self._steps + ]) + + def _get_line_points(self, num_points_per_link=1): + theta = self._joint_angles[:, None] + + intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 + accumulated_theta = np.cumsum(theta, axis=0) + end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) + + x = np.cos(accumulated_theta) * self.link_lengths[:, None] * intermediate_points + y = np.sin(accumulated_theta) * self.link_lengths[:, None] * intermediate_points + + end_effector[0, :, 0] = x[0, :] + end_effector[0, :, 1] = y[0, :] + + for i in range(1, self.n_links): + end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0] + end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1] + + # xy = np.stack((x, y), axis=2) + # + # self._joints[0] + np.cumsum(xy, axis=0) + + return np.squeeze(end_effector + self._joints[0, :]) + + def check_wall_collision(self): + line_points = self._get_line_points(num_points_per_link=100) + + # all points that are before the hole in x + r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2)) + + # check if any of those points are below surface + nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0) + + if nr_line_points_below_surface_before_hole > 0: + return True + + # all points that are after the hole in x + r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2)) + + # check if any of those points are below surface + nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0) + + if nr_line_points_below_surface_after_hole > 0: + return True + + # all points that are above the hole + r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & ( + line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2))) + + # check if any of those points are below surface + nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth) + + if nr_line_points_below_surface_in_hole > 0: + return True + + return False + + # def check_wall_collision(self, ): + # """find the intersection of line segments A=(x1,y1)/(x2,y2) and + # B=(x3,y3)/(x4,y4). """ + # + # link_lines = np.hstack((self._joints[:-1, :], self._joints[1:, :])) + # + # all_points_product = np.hstack( + # [np.repeat(link_lines, len(self.ground_lines), axis=0), + # np.tile(self.ground_lines, (len(link_lines), 1))]) + # + # x1, y1, x2, y2, x3, y3, x4, y4 = all_points_product.T + # + # denom = ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)) + # # if denom == 0: + # # return False + # px = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denom + # py = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denom + # # if (px - x1) * (px - x2) < 0 and (py - y1) * (py - y2) < 0 \ + # # and (px - x3) * (px - x4) < 0 and (py - y3) * (py - y4) < 0: + # # return True # [px, py] + # test = ((px - x1) * (px - x2) <= 0) & ((py - y1) * (py - y2) <= 0) & ((px - x3) * (px - x4) <= 0) & ( + # (py - y3) * (py - y4) <= 0) + # if np.any(test): + # possible_collisions = np.stack((px, py)).T[test] + # for row in possible_collisions: + # if not np.any([np.allclose(row, x) for x in self._joints]): + # return True, row + # + # return False, None + + def render(self, mode='human'): + if self.fig is None: + # Create base figure once on the beginning. Afterwards only update + plt.ion() + self.fig = plt.figure() + ax = self.fig.add_subplot(1, 1, 1) + + # limits + lim = np.sum(self.link_lengths) + 0.5 + ax.set_xlim([-lim, lim]) + ax.set_ylim([-1.1, lim]) + + self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') + self._set_patches() + self.fig.show() + + self.fig.gca().set_title( + f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}") + + if mode == "human": + + # arm + self.line.set_data(self._joints[:, 0], self._joints[:, 1]) + + self.fig.canvas.draw() + self.fig.canvas.flush_events() + + elif mode == "partial": + if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided: + # Arm + plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k', + alpha=self._steps / 200) + + def _set_patches(self): + if self.fig is not None: + # self.fig.gca().patches = [] + left_block = patches.Rectangle((-self.n_links, -self._tmp_depth), + self.n_links + self._tmp_x - self._tmp_width / 2, + self._tmp_depth, + fill=True, edgecolor='k', facecolor='k') + right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth), + self.n_links - self._tmp_x + self._tmp_width / 2, + self._tmp_depth, + fill=True, edgecolor='k', facecolor='k') + hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth), + self._tmp_width, + 1 - self._tmp_depth, + fill=True, edgecolor='k', facecolor='k') + + # Add the patch to the Axes + self.fig.gca().add_patch(left_block) + self.fig.gca().add_patch(right_block) + self.fig.gca().add_patch(hole_floor) + + +class HoleReacherEnvOld(gym.Env): def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, @@ -312,3 +545,19 @@ class HoleReacherEnv(gym.Env): super().close() if self.fig is not None: plt.close(self.fig) + + +if __name__ == "__main__": + import time + env = HoleReacherEnv(5) + env.reset() + + start = time.time() + for i in range(10000): + # env.check_wall_collision() + ac = env.action_space.sample() + obs, rew, done, info = env.step(ac) + # env.render() + if done: + env.reset() + print(time.time() - start) diff --git a/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py b/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py index d951161..feb545f 100644 --- a/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py +++ b/alr_envs/alr/classic_control/hole_reacher/mp_wrapper.py @@ -18,6 +18,14 @@ class MPWrapper(MPEnvWrapper): [False] # env steps ]) + # @property + # def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + # return self._joint_angles.copy() + # + # @property + # def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + # return self._angle_velocity.copy() + @property def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: return self.env.current_pos diff --git a/alr_envs/alr/classic_control/hole_reacher/simple_reward.py b/alr_envs/alr/classic_control/hole_reacher/simple_reward.py new file mode 100644 index 0000000..0609a62 --- /dev/null +++ b/alr_envs/alr/classic_control/hole_reacher/simple_reward.py @@ -0,0 +1,48 @@ +import numpy as np +from alr_envs.alr.classic_control.utils import check_self_collision + + +class HolereacherSimpleReward: + def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty): + self.collision_penalty = collision_penalty + + # collision + self.allow_self_collision = allow_self_collision + self.allow_wall_collision = allow_wall_collision + self.collision_penalty = collision_penalty + self._is_collided = False + pass + + def get_reward(self, env, action): + reward = 0 + success = False + + self_collision = False + wall_collision = False + + # joints = np.hstack((env._joints[:-1, :], env._joints[1:, :])) + if not self.allow_self_collision: + self_collision = env._check_self_collision() + + if not self.allow_wall_collision: + wall_collision = env.check_wall_collision() + + self._is_collided = self_collision or wall_collision + + if env._steps == 199 or self._is_collided: + # return reward only in last time step + # Episode also terminates when colliding, hence return reward + dist = np.linalg.norm(env.end_effector - env._goal) + + success = dist < 0.005 and not self._is_collided + reward = - dist ** 2 - self.collision_penalty * self._is_collided + + info = {"is_success": success, + "is_collided": self._is_collided} + + acc = (action - env._angle_velocity) / env.dt + reward -= 5e-8 * np.sum(acc ** 2) + + return reward, info + + diff --git a/alr_envs/alr/classic_control/utils.py b/alr_envs/alr/classic_control/utils.py index ea378d1..b557a0a 100644 --- a/alr_envs/alr/classic_control/utils.py +++ b/alr_envs/alr/classic_control/utils.py @@ -1,3 +1,6 @@ +import numpy as np + + def ccw(A, B, C): return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0]) > 1e-12 diff --git a/alr_envs/examples/examples_motion_primitives.py b/alr_envs/examples/examples_motion_primitives.py index de365b7..b307a52 100644 --- a/alr_envs/examples/examples_motion_primitives.py +++ b/alr_envs/examples/examples_motion_primitives.py @@ -147,18 +147,18 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True): if __name__ == '__main__': - render = True + render = False # DMP - example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) + # example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render) # ProMP - example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=1, render=render) + # example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render) # DetProMP - example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=1, render=render) + example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=100, render=render) # Altered basis functions - example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) + # example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) # Custom MP - example_fully_custom_mp(seed=10, iterations=1, render=render) + # example_fully_custom_mp(seed=10, iterations=1, render=render)