From 7a725077e24e4d5117afcb2155c600937f66e01d Mon Sep 17 00:00:00 2001 From: Maximilian Huettenrauch Date: Thu, 8 Jul 2021 13:48:08 +0200 Subject: [PATCH] wip --- .../classic_control/base_reacher/__init__.py | 0 .../base_reacher/base_reacher_direct.py | 142 ++++++++++++++++++ .../base_reacher/base_reacher_torque.py | 142 ++++++++++++++++++ alr_envs/classic_control/utils.py | 2 +- alr_envs/examples/examples_general.py | 6 +- 5 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 alr_envs/classic_control/base_reacher/__init__.py create mode 100644 alr_envs/classic_control/base_reacher/base_reacher_direct.py create mode 100644 alr_envs/classic_control/base_reacher/base_reacher_torque.py diff --git a/alr_envs/classic_control/base_reacher/__init__.py b/alr_envs/classic_control/base_reacher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alr_envs/classic_control/base_reacher/base_reacher_direct.py b/alr_envs/classic_control/base_reacher/base_reacher_direct.py new file mode 100644 index 0000000..12e5830 --- /dev/null +++ b/alr_envs/classic_control/base_reacher/base_reacher_direct.py @@ -0,0 +1,142 @@ +from typing import Iterable, Union +from abc import ABCMeta, abstractmethod +import gym +import matplotlib.pyplot as plt +import numpy as np +from gym import spaces +from gym.utils import seeding +from alr_envs.classic_control.utils import check_self_collision + + +class BaseReacherEnv(gym.Env): + """ + Simple Reaching Task without any physics simulation. + Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions + towards the end of the trajectory. + """ + + def __init__(self, n_links: int, random_start: bool = True, + allow_self_collision: bool = False, collision_penalty: float = 1000): + super().__init__() + self.link_lengths = np.ones(n_links) + self.n_links = n_links + self._dt = 0.01 + + self.random_start = random_start + + self._joints = None + self._joint_angles = None + self._angle_velocity = None + self._is_collided = False + self.allow_self_collision = allow_self_collision + self.collision_penalty = collision_penalty + self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) + self._start_vel = np.zeros(self.n_links) + + self.max_torque = 1 + self.steps_before_reward = 199 + + action_bound = np.ones((self.n_links,)) * self.max_torque + state_bound = np.hstack([ + [np.pi] * self.n_links, # cos + [np.pi] * self.n_links, # sin + [np.inf] * self.n_links, # velocity + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + # containers for plotting + self.metadata = {'render.modes': ["human"]} + self.fig = None + + self._steps = 0 + self.seed() + + @property + def dt(self) -> Union[float, int]: + return self._dt + + @property + def current_pos(self): + return self._joint_angles.copy() + + @property + def current_vel(self): + return self._angle_velocity.copy() + + def reset(self): + # Sample only orientation of first link, i.e. the arm is always straight. + if self.random_start: + first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) + self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) + self._start_pos = self._joint_angles.copy() + else: + self._joint_angles = self._start_pos + + self._angle_velocity = self._start_vel + self._joints = np.zeros((self.n_links + 1, 2)) + self._update_joints() + self._steps = 0 + + return self._get_obs().copy() + + def step(self, action: np.ndarray): + """ + A single step with action in torque space + """ + + # action = self._add_action_noise(action) + ac = np.clip(action, -self.max_torque, self.max_torque) + + self._angle_velocity = self._angle_velocity + self.dt * ac + self._joint_angles = self._joint_angles + self.dt * self._angle_velocity + self._update_joints() + + if not self.allow_self_collision: + self_collision = check_self_collision(line_points_in_taskspace) + if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: + self_collision = True + + self._is_collided = self._check_collisions() + + reward, info = self._get_reward(action) + + self._steps += 1 + done = False + + return self._get_obs().copy(), reward, done, info + + def _update_joints(self): + """ + update joints to get new end-effector position. The other links are only required for rendering. + Returns: + + """ + angles = np.cumsum(self._joint_angles) + x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) + self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + + @abstractmethod + def _get_reward(self, action: np.ndarray) -> (float, dict): + pass + + @abstractmethod + def _get_obs(self) -> np.ndarray: + pass + + @abstractmethod + def _check_collisions(self) -> bool: + pass + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def close(self): + del self.fig + + @property + def end_effector(self): + return self._joints[self.n_links].T diff --git a/alr_envs/classic_control/base_reacher/base_reacher_torque.py b/alr_envs/classic_control/base_reacher/base_reacher_torque.py new file mode 100644 index 0000000..12e5830 --- /dev/null +++ b/alr_envs/classic_control/base_reacher/base_reacher_torque.py @@ -0,0 +1,142 @@ +from typing import Iterable, Union +from abc import ABCMeta, abstractmethod +import gym +import matplotlib.pyplot as plt +import numpy as np +from gym import spaces +from gym.utils import seeding +from alr_envs.classic_control.utils import check_self_collision + + +class BaseReacherEnv(gym.Env): + """ + Simple Reaching Task without any physics simulation. + Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions + towards the end of the trajectory. + """ + + def __init__(self, n_links: int, random_start: bool = True, + allow_self_collision: bool = False, collision_penalty: float = 1000): + super().__init__() + self.link_lengths = np.ones(n_links) + self.n_links = n_links + self._dt = 0.01 + + self.random_start = random_start + + self._joints = None + self._joint_angles = None + self._angle_velocity = None + self._is_collided = False + self.allow_self_collision = allow_self_collision + self.collision_penalty = collision_penalty + self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) + self._start_vel = np.zeros(self.n_links) + + self.max_torque = 1 + self.steps_before_reward = 199 + + action_bound = np.ones((self.n_links,)) * self.max_torque + state_bound = np.hstack([ + [np.pi] * self.n_links, # cos + [np.pi] * self.n_links, # sin + [np.inf] * self.n_links, # velocity + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + # containers for plotting + self.metadata = {'render.modes': ["human"]} + self.fig = None + + self._steps = 0 + self.seed() + + @property + def dt(self) -> Union[float, int]: + return self._dt + + @property + def current_pos(self): + return self._joint_angles.copy() + + @property + def current_vel(self): + return self._angle_velocity.copy() + + def reset(self): + # Sample only orientation of first link, i.e. the arm is always straight. + if self.random_start: + first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) + self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) + self._start_pos = self._joint_angles.copy() + else: + self._joint_angles = self._start_pos + + self._angle_velocity = self._start_vel + self._joints = np.zeros((self.n_links + 1, 2)) + self._update_joints() + self._steps = 0 + + return self._get_obs().copy() + + def step(self, action: np.ndarray): + """ + A single step with action in torque space + """ + + # action = self._add_action_noise(action) + ac = np.clip(action, -self.max_torque, self.max_torque) + + self._angle_velocity = self._angle_velocity + self.dt * ac + self._joint_angles = self._joint_angles + self.dt * self._angle_velocity + self._update_joints() + + if not self.allow_self_collision: + self_collision = check_self_collision(line_points_in_taskspace) + if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: + self_collision = True + + self._is_collided = self._check_collisions() + + reward, info = self._get_reward(action) + + self._steps += 1 + done = False + + return self._get_obs().copy(), reward, done, info + + def _update_joints(self): + """ + update joints to get new end-effector position. The other links are only required for rendering. + Returns: + + """ + angles = np.cumsum(self._joint_angles) + x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) + self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) + + @abstractmethod + def _get_reward(self, action: np.ndarray) -> (float, dict): + pass + + @abstractmethod + def _get_obs(self) -> np.ndarray: + pass + + @abstractmethod + def _check_collisions(self) -> bool: + pass + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def close(self): + del self.fig + + @property + def end_effector(self): + return self._joints[self.n_links].T diff --git a/alr_envs/classic_control/utils.py b/alr_envs/classic_control/utils.py index fa8176a..ea378d1 100644 --- a/alr_envs/classic_control/utils.py +++ b/alr_envs/classic_control/utils.py @@ -10,7 +10,7 @@ def intersect(A, B, C, D): def check_self_collision(line_points): - """Checks whether line segments and intersect""" + """Checks whether line segments intersect""" for i, line1 in enumerate(line_points): for line2 in line_points[i + 2:, :, :]: if intersect(line1[0], line1[-1], line2[0], line2[-1]): diff --git a/alr_envs/examples/examples_general.py b/alr_envs/examples/examples_general.py index 99ca8f6..7ab77c1 100644 --- a/alr_envs/examples/examples_general.py +++ b/alr_envs/examples/examples_general.py @@ -89,8 +89,8 @@ if __name__ == '__main__': # Basic gym task # example_general("Pendulum-v0", seed=10, iterations=200, render=True) # - # # Basis task from framework - # example_general("alr_envs:HoleReacher-v0", seed=10, iterations=200, render=True) + # Basis task from framework + example_general("alr_envs:HoleReacher-v0", seed=10, iterations=200, render=True) # # # OpenAI Mujoco task # example_general("HalfCheetah-v2", seed=10, render=True) @@ -99,4 +99,4 @@ if __name__ == '__main__': # example_general("alr_envs:ALRReacher-v0", seed=10, iterations=200, render=True) # Vectorized multiprocessing environments - example_async(env_id="alr_envs:HoleReacher-v0", n_cpu=2, seed=int('533D', 16), n_samples=2 * 200) + # example_async(env_id="alr_envs:HoleReacher-v0", n_cpu=2, seed=int('533D', 16), n_samples=2 * 200)