reacher envs cleanup

- base classes for direct and torque control
- added promp wrapper support
This commit is contained in:
Maximilian Huettenrauch 2021-11-26 16:31:46 +01:00
parent 928c540251
commit 2543bcd7ec
11 changed files with 645 additions and 821 deletions

View File

@ -83,7 +83,7 @@ register(
register( register(
id='HoleReacher-v1', id='HoleReacher-v1',
entry_point='alr_envs.alr.classic_control:HoleReacherEnvOld', entry_point='alr_envs.alr.classic_control:HoleReacherEnv',
max_episode_steps=200, max_episode_steps=200,
kwargs={ kwargs={
"n_links": 5, "n_links": 5,
@ -109,7 +109,7 @@ register(
"hole_width": 0.25, "hole_width": 0.25,
"hole_depth": 1, "hole_depth": 1,
"hole_x": 2, "hole_x": 2,
"collision_penalty": 100, "collision_penalty": 1,
} }
) )
@ -220,6 +220,25 @@ for _v in _versions:
) )
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id)
_env_id = f'{_name[0]}ProMP-{_name[1]}'
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:{_v}",
"wrappers": [classic_control.simple_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 2 if "long" not in _v.lower() else 5,
"num_basis": 5,
"duration": 2,
"policy_type": "motor",
"weights_scale": 1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
_env_id = f'{_name[0]}DetPMP-{_name[1]}' _env_id = f'{_name[0]}DetPMP-{_name[1]}'
register( register(
id=_env_id, id=_env_id,
@ -262,6 +281,24 @@ register(
) )
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0") ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0")
register(
id="ViaPointReacherProMP-v0",
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:ViaPointReacher-v0",
"wrappers": [classic_control.viapoint_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 5,
"num_basis": 5,
"duration": 2,
"policy_type": "motor",
"weights_scale": 1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ViaPointReacherProMP-v0")
register( register(
id='ViaPointReacherDetPMP-v0', id='ViaPointReacherDetPMP-v0',
entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper', entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper',

View File

@ -0,0 +1,141 @@
from typing import Iterable, Union
from abc import ABCMeta, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.utils import intersect
class BaseReacherEnv(gym.Env):
"""
Base class for all reaching environments.
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._acc = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
# joint limits
self.j_min = -np.pi * np.ones(n_links)
self.j_max = np.pi * np.ones(n_links)
self.steps_before_reward = 199
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
self.reward_function = None # Needs to be set in sub class
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
@abstractmethod
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
raise NotImplementedError
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _check_self_collision(self):
"""Checks whether line segments intersect"""
if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min):
return True
link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1)
for i, line1 in enumerate(link_lines):
for line2 in link_lines[i + 2:, :]:
if intersect(line1[0], line1[-1], line2[0], line2[-1]):
return True
return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
@abstractmethod
def _terminate(self, info) -> bool:
return False
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -1,157 +1,35 @@
from typing import Iterable, Union
from abc import ABCMeta, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces from gym import spaces
from gym.utils import seeding import numpy as np
from alr_envs.alr.classic_control.utils import intersect from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherEnv(gym.Env): class BaseReacherDirectEnv(BaseReacherEnv):
""" """
Simple Reaching Task without any physics simulation. Base class for directly controlled reaching environments
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory.
""" """
def __init__(self, n_links: int, random_start: bool = True, def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False): allow_self_collision: bool = False):
super().__init__() super().__init__(n_links, random_start, allow_self_collision)
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
# joint limits
self.j_min = -np.pi * np.ones(n_links)
self.j_max = np.pi * np.ones(n_links)
self.max_vel = 1
self.steps_before_reward = 199
self.max_vel = 10 * np.pi
action_bound = np.ones((self.n_links,)) * self.max_vel action_bound = np.ones((self.n_links,)) * self.max_vel
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
self.reward_function = None # Needs to be set in sub class
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" """
A single step with action in angular velocity space A single step with action in angular velocity space
""" """
acc = (action - self._angle_velocity) / self.dt self._acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints() self._update_joints()
self._is_collided = self._check_collisions() self._is_collided = self._check_collisions()
# reward, info = self._get_reward(action) reward, info = self._get_reward(action)
reward, info = self.reward_function.get_reward(self, acc)
self._steps += 1 self._steps += 1
done = self._terminate(info) done = self._terminate(info)
return self._get_obs().copy(), reward, done, info return self._get_obs().copy(), reward, done, info
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _check_self_collision(self):
"""Checks whether line segments intersect"""
if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min):
return True
link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1)
for i, line1 in enumerate(link_lines):
for line2 in link_lines[i + 2:, :]:
if intersect(line1[0], line1[-1], line2[0], line2[-1]):
return True
return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
@abstractmethod
def _terminate(self, info) -> bool:
return False
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -1,86 +1,19 @@
from typing import Iterable, Union
from abc import ABCMeta, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces from gym import spaces
from gym.utils import seeding import numpy as np
from alr_envs.classic_control.utils import check_self_collision from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherEnv(gym.Env): class BaseReacherTorqueEnv(BaseReacherEnv):
""" """
Simple Reaching Task without any physics simulation. Base class for torque controlled reaching environments
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory.
""" """
def __init__(self, n_links: int, random_start: bool = True, def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False, collision_penalty: float = 1000): allow_self_collision: bool = False):
super().__init__() super().__init__(n_links, random_start, allow_self_collision)
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._is_collided = False
self.allow_self_collision = allow_self_collision
self.collision_penalty = collision_penalty
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self.max_torque = 1
self.steps_before_reward = 199
self.max_torque = 1000
action_bound = np.ones((self.n_links,)) * self.max_torque action_bound = np.ones((self.n_links,)) * self.max_torque
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" """
@ -94,11 +27,6 @@ class BaseReacherEnv(gym.Env):
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints() self._update_joints()
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
self._is_collided = self._check_collisions() self._is_collided = self._check_collisions()
reward, info = self._get_reward(action) reward, info = self._get_reward(action)
@ -107,36 +35,3 @@ class BaseReacherEnv(gym.Env):
done = False done = False
return self._get_obs().copy(), reward, done, info return self._get_obs().copy(), reward, done, info
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -3,14 +3,12 @@ from typing import Union
import gym import gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym.utils import seeding
from matplotlib import patches from matplotlib import patches
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherEnv from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
from alr_envs.alr.classic_control.utils import check_self_collision
class HoleReacherEnv(BaseReacherEnv): class HoleReacherEnv(BaseReacherDirectEnv):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"): allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"):
@ -28,7 +26,7 @@ class HoleReacherEnv(BaseReacherEnv):
self._tmp_depth = None self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
action_bound = np.pi * np.ones((self.n_links,)) # action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -38,19 +36,25 @@ class HoleReacherEnv(BaseReacherEnv):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe [np.inf] # env steps, because reward start after n steps TODO: Maybe
]) ])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) # self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
if rew_fct == "simple": if rew_fct == "simple":
from alr_envs.alr.classic_control.hole_reacher.simple_reward import HolereacherSimpleReward from alr_envs.alr.classic_control.hole_reacher.hr_simple_reward import HolereacherReward
self.reward_function = HolereacherSimpleReward(allow_self_collision, allow_wall_collision, collision_penalty) self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty)
else:
raise ValueError("Unknown reward function {}".format(rew_fct))
def reset(self): def reset(self):
self._generate_hole() self._generate_hole()
self._set_patches() self._set_patches()
self.reward_function.reset()
return super().reset() return super().reset()
def _get_reward(self, action: np.ndarray) -> (float, dict):
return self.reward_function.get_reward(self)
def _terminate(self, info): def _terminate(self, info):
return info["is_collided"] return info["is_collided"]
@ -118,10 +122,6 @@ class HoleReacherEnv(BaseReacherEnv):
end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0] end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0]
end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1] end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1]
# xy = np.stack((x, y), axis=2)
#
# self._joints[0] + np.cumsum(xy, axis=0)
return np.squeeze(end_effector + self._joints[0, :]) return np.squeeze(end_effector + self._joints[0, :])
def check_wall_collision(self): def check_wall_collision(self):
@ -157,36 +157,6 @@ class HoleReacherEnv(BaseReacherEnv):
return False return False
# def check_wall_collision(self, ):
# """find the intersection of line segments A=(x1,y1)/(x2,y2) and
# B=(x3,y3)/(x4,y4). """
#
# link_lines = np.hstack((self._joints[:-1, :], self._joints[1:, :]))
#
# all_points_product = np.hstack(
# [np.repeat(link_lines, len(self.ground_lines), axis=0),
# np.tile(self.ground_lines, (len(link_lines), 1))])
#
# x1, y1, x2, y2, x3, y3, x4, y4 = all_points_product.T
#
# denom = ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4))
# # if denom == 0:
# # return False
# px = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denom
# py = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denom
# # if (px - x1) * (px - x2) < 0 and (py - y1) * (py - y2) < 0 \
# # and (px - x3) * (px - x4) < 0 and (py - y3) * (py - y4) < 0:
# # return True # [px, py]
# test = ((px - x1) * (px - x2) <= 0) & ((py - y1) * (py - y2) <= 0) & ((px - x3) * (px - x4) <= 0) & (
# (py - y3) * (py - y4) <= 0)
# if np.any(test):
# possible_collisions = np.stack((px, py)).T[test]
# for row in possible_collisions:
# if not np.any([np.allclose(row, x) for x in self._joints]):
# return True, row
#
# return False, None
def render(self, mode='human'): def render(self, mode='human'):
if self.fig is None: if self.fig is None:
# Create base figure once on the beginning. Afterwards only update # Create base figure once on the beginning. Afterwards only update
@ -242,322 +212,14 @@ class HoleReacherEnv(BaseReacherEnv):
self.fig.gca().add_patch(hole_floor) self.fig.gca().add_patch(hole_floor)
class HoleReacherEnvOld(gym.Env):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000):
self.n_links = n_links
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters
self.initial_x = hole_x # x-position of center of hole
self.initial_width = hole_width # width of hole
self.initial_depth = hole_depth # depth of hole
# temp container for current env state
self._tmp_x = None
self._tmp_width = None
self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf], # hole width
# [np.inf], # hole depth
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
A single step with an action in joint velocity space
"""
acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5)
self._update_joints()
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self.end_effector_traj.append(np.copy(self.end_effector))
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self):
if self.random_start:
# Maybe change more than first seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_hole()
self._set_patches()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
self.end_effector_traj = []
return self._get_obs().copy()
def _generate_hole(self):
if self.initial_width is None:
width = self.np_random.uniform(0.15, 0.5)
else:
width = np.copy(self.initial_width)
if self.initial_x is None:
# sample whole on left or right side
direction = self.np_random.choice([-1, 1])
# Hole center needs to be half the width away from the arm to give a valid setting.
x = direction * self.np_random.uniform(width / 2, 3.5)
else:
x = np.copy(self.initial_x)
if self.initial_depth is None:
# TODO we do not want this right now.
depth = self.np_random.uniform(1, 1)
else:
depth = np.copy(self.initial_depth)
self._tmp_width = width
self._tmp_x = x
self._tmp_depth = depth
self._goal = np.hstack([self._tmp_x, -self._tmp_depth])
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
if not self.allow_wall_collision:
wall_collision = self._check_wall_collision(line_points_in_taskspace)
self._is_collided = self_collision or wall_collision
def _get_reward(self, acc: np.ndarray):
reward = 0
# success = False
if self._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
# success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
reward -= 5e-8 * np.sum(acc ** 2)
# info = {"is_success": success}
return reward, {} # info
def _get_obs(self):
theta = self._joint_angles
return np.hstack([
np.cos(theta),
np.sin(theta),
self._angle_velocity,
self._tmp_width,
# self._tmp_hole_depth,
self.end_effector - self._goal,
self._steps
])
def _get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
end_effector[0, :, 0] = x[0, :]
end_effector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0]
end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1]
return np.squeeze(end_effector + self._joints[0, :])
def _check_wall_collision(self, line_points):
# all points that are before the hole in x
r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_before_hole > 0:
return True
# all points that are after the hole in x
r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_after_hole > 0:
return True
# all points that are above the hole
r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & (
line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2)))
# check if any of those points are below surface
nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth)
if nr_line_points_below_surface_in_hole > 0:
return True
return False
def render(self, mode='human'):
if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion()
self.fig = plt.figure()
ax = self.fig.add_subplot(1, 1, 1)
# limits
lim = np.sum(self.link_lengths) + 0.5
ax.set_xlim([-lim, lim])
ax.set_ylim([-1.1, lim])
self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
self._set_patches()
self.fig.show()
self.fig.gca().set_title(
f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}")
if mode == "human":
# arm
self.line.set_data(self._joints[:, 0], self._joints[:, 1])
self.fig.canvas.draw()
self.fig.canvas.flush_events()
elif mode == "partial":
if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided:
# Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k',
alpha=self._steps / 200)
def _set_patches(self):
if self.fig is not None:
self.fig.gca().patches = []
left_block = patches.Rectangle((-self.n_links, -self._tmp_depth),
self.n_links + self._tmp_x - self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth),
self.n_links - self._tmp_x + self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth),
self._tmp_width,
1 - self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
# Add the patch to the Axes
self.fig.gca().add_patch(left_block)
self.fig.gca().add_patch(right_block)
self.fig.gca().add_patch(hole_floor)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self):
super().close()
if self.fig is not None:
plt.close(self.fig)
if __name__ == "__main__": if __name__ == "__main__":
import time import time
env = HoleReacherEnv(5) env = HoleReacherEnv(5)
env.reset() env.reset()
start = time.time()
for i in range(10000): for i in range(10000):
# env.check_wall_collision()
ac = env.action_space.sample() ac = env.action_space.sample()
obs, rew, done, info = env.step(ac) obs, rew, done, info = env.step(ac)
# env.render() # env.render()
if done: if done:
env.reset() env.reset()
print(time.time() - start)

View File

@ -0,0 +1,315 @@
from typing import Union
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym.utils import seeding
from matplotlib import patches
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
from alr_envs.alr.classic_control.utils import check_self_collision
class HoleReacherEnvOld(gym.Env):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000):
self.n_links = n_links
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters
self.initial_x = hole_x # x-position of center of hole
self.initial_width = hole_width # width of hole
self.initial_depth = hole_depth # depth of hole
# temp container for current env state
self._tmp_x = None
self._tmp_width = None
self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf], # hole width
# [np.inf], # hole depth
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
A single step with an action in joint velocity space
"""
acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5)
self._update_joints()
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self.end_effector_traj.append(np.copy(self.end_effector))
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self):
if self.random_start:
# Maybe change more than first seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_hole()
self._set_patches()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
self.end_effector_traj = []
return self._get_obs().copy()
def _generate_hole(self):
if self.initial_width is None:
width = self.np_random.uniform(0.15, 0.5)
else:
width = np.copy(self.initial_width)
if self.initial_x is None:
# sample whole on left or right side
direction = self.np_random.choice([-1, 1])
# Hole center needs to be half the width away from the arm to give a valid setting.
x = direction * self.np_random.uniform(width / 2, 3.5)
else:
x = np.copy(self.initial_x)
if self.initial_depth is None:
# TODO we do not want this right now.
depth = self.np_random.uniform(1, 1)
else:
depth = np.copy(self.initial_depth)
self._tmp_width = width
self._tmp_x = x
self._tmp_depth = depth
self._goal = np.hstack([self._tmp_x, -self._tmp_depth])
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
if not self.allow_wall_collision:
wall_collision = self._check_wall_collision(line_points_in_taskspace)
self._is_collided = self_collision or wall_collision
def _get_reward(self, acc: np.ndarray):
reward = 0
# success = False
if self._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
# success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
reward -= 5e-8 * np.sum(acc ** 2)
# info = {"is_success": success}
return reward, {} # info
def _get_obs(self):
theta = self._joint_angles
return np.hstack([
np.cos(theta),
np.sin(theta),
self._angle_velocity,
self._tmp_width,
# self._tmp_hole_depth,
self.end_effector - self._goal,
self._steps
])
def _get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
end_effector[0, :, 0] = x[0, :]
end_effector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0]
end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1]
return np.squeeze(end_effector + self._joints[0, :])
def _check_wall_collision(self, line_points):
# all points that are before the hole in x
r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_before_hole > 0:
return True
# all points that are after the hole in x
r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_after_hole > 0:
return True
# all points that are above the hole
r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & (
line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2)))
# check if any of those points are below surface
nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth)
if nr_line_points_below_surface_in_hole > 0:
return True
return False
def render(self, mode='human'):
if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion()
self.fig = plt.figure()
ax = self.fig.add_subplot(1, 1, 1)
# limits
lim = np.sum(self.link_lengths) + 0.5
ax.set_xlim([-lim, lim])
ax.set_ylim([-1.1, lim])
self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
self._set_patches()
self.fig.show()
self.fig.gca().set_title(
f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}")
if mode == "human":
# arm
self.line.set_data(self._joints[:, 0], self._joints[:, 1])
self.fig.canvas.draw()
self.fig.canvas.flush_events()
elif mode == "partial":
if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided:
# Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k',
alpha=self._steps / 200)
def _set_patches(self):
if self.fig is not None:
self.fig.gca().patches = []
left_block = patches.Rectangle((-self.n_links, -self._tmp_depth),
self.n_links + self._tmp_x - self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth),
self.n_links - self._tmp_x + self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth),
self._tmp_width,
1 - self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
# Add the patch to the Axes
self.fig.gca().add_patch(left_block)
self.fig.gca().add_patch(right_block)
self.fig.gca().add_patch(hole_floor)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self):
super().close()
if self.fig is not None:
plt.close(self.fig)

View File

@ -0,0 +1,56 @@
import numpy as np
class HolereacherReward:
def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty):
self.collision_penalty = collision_penalty
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
self._is_collided = False
self.reward_factors = np.array((-1, -1e-4, -1e-6, -collision_penalty, -1))
def reset(self):
self._is_collided = False
def get_reward(self, env):
dist_cost = 0
collision_cost = 0
time_cost = 0
success = False
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = env._check_self_collision()
if not self.allow_wall_collision:
wall_collision = env.check_wall_collision()
self._is_collided = self_collision or wall_collision
if env._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(env.end_effector - env._goal)
success = dist < 0.005 and not self._is_collided
dist_cost = int(not self._is_collided) * dist ** 2
collision_cost = self._is_collided * dist ** 2
time_cost = 199 - env._steps
info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
vel_cost = np.sum(env._angle_velocity ** 2)
acc_cost = np.sum(env._acc ** 2)
reward_features = np.array((dist_cost, vel_cost, acc_cost, collision_cost, time_cost))
reward = np.dot(reward_features, self.reward_factors)
return reward, info

View File

@ -1,8 +1,7 @@
import numpy as np import numpy as np
from alr_envs.alr.classic_control.utils import check_self_collision
class HolereacherSimpleReward: class HolereacherReward:
def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty): def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty):
self.collision_penalty = collision_penalty self.collision_penalty = collision_penalty
@ -11,16 +10,20 @@ class HolereacherSimpleReward:
self.allow_wall_collision = allow_wall_collision self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty self.collision_penalty = collision_penalty
self._is_collided = False self._is_collided = False
pass
def get_reward(self, env, action): self.reward_factors = np.array((-1, -5e-8, -collision_penalty))
reward = 0
def reset(self):
self._is_collided = False
def get_reward(self, env):
dist_cost = 0
collision_cost = 0
success = False success = False
self_collision = False self_collision = False
wall_collision = False wall_collision = False
# joints = np.hstack((env._joints[:-1, :], env._joints[1:, :]))
if not self.allow_self_collision: if not self.allow_self_collision:
self_collision = env._check_self_collision() self_collision = env._check_self_collision()
@ -33,16 +36,18 @@ class HolereacherSimpleReward:
# return reward only in last time step # return reward only in last time step
# Episode also terminates when colliding, hence return reward # Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(env.end_effector - env._goal) dist = np.linalg.norm(env.end_effector - env._goal)
dist_cost = dist ** 2
collision_cost = int(self._is_collided)
success = dist < 0.005 and not self._is_collided success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
info = {"is_success": success, info = {"is_success": success,
"is_collided": self._is_collided} "is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
acc = (action - env._angle_velocity) / env.dt acc_cost = np.sum(env._acc ** 2)
reward -= 5e-8 * np.sum(acc ** 2)
reward_features = np.array((dist_cost, acc_cost, collision_cost))
reward = np.dot(reward_features, self.reward_factors)
return reward, info return reward, info

View File

@ -1,26 +1,22 @@
from typing import Iterable, Union from typing import Iterable, Union
import gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym import spaces from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv
class SimpleReacherEnv(gym.Env): class SimpleReacherEnv(BaseReacherTorqueEnv):
""" """
Simple Reaching Task without any physics simulation. Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory. towards the end of the trajectory.
""" """
def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True): def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True,
super().__init__() allow_self_collision: bool = False,):
self.link_lengths = np.ones(n_links) super().__init__(n_links, random_start, allow_self_collision)
self.n_links = n_links
self._dt = 0.1
self.random_start = random_start
# provided initial parameters # provided initial parameters
self.inital_target = target self.inital_target = target
@ -28,16 +24,10 @@ class SimpleReacherEnv(gym.Env):
# temp container for current env state # temp container for current env state
self._goal = None self._goal = None
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.zeros(self.n_links) self._start_pos = np.zeros(self.n_links)
self._start_vel = np.zeros(self.n_links)
self.max_torque = 1
self.steps_before_reward = 199 self.steps_before_reward = 199
action_bound = np.ones((self.n_links,)) * self.max_torque
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -45,84 +35,24 @@ class SimpleReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe [np.inf] # env steps, because reward start after n steps TODO: Maybe
]) ])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property # @property
# def start_pos(self): # def start_pos(self):
# return self._start_pos # return self._start_pos
@property
def current_pos(self):
return self._joint_angles
@property
def current_vel(self):
return self._angle_velocity
def step(self, action: np.ndarray):
"""
A single step with action in torque space
"""
# action = self._add_action_noise(action)
ac = np.clip(action, -self.max_torque, self.max_torque)
self._angle_velocity = self._angle_velocity + self.dt * ac
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
reward, info = self._get_reward(action)
self._steps += 1
done = False
return self._get_obs().copy(), reward, done, info
def reset(self): def reset(self):
# TODO: maybe do initialisation more random?
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
self._joint_angles = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal() self._generate_goal()
self._angle_velocity = self._start_vel return super().reset()
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _get_reward(self, action: np.ndarray): def _get_reward(self, action: np.ndarray):
diff = self.end_effector - self._goal diff = self.end_effector - self._goal
reward_dist = 0 reward_dist = 0
if not self.allow_self_collision:
self._is_collided = self._check_self_collision()
if self._steps >= self.steps_before_reward: if self._steps >= self.steps_before_reward:
reward_dist -= np.linalg.norm(diff) reward_dist -= np.linalg.norm(diff)
# reward_dist = np.exp(-0.1 * diff ** 2).mean() # reward_dist = np.exp(-0.1 * diff ** 2).mean()
@ -132,6 +62,9 @@ class SimpleReacherEnv(gym.Env):
reward = reward_dist - reward_ctrl reward = reward_dist - reward_ctrl
return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl) return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl)
def _terminate(self, info):
return False
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
return np.hstack([ return np.hstack([
@ -190,13 +123,14 @@ class SimpleReacherEnv(gym.Env):
self.fig.canvas.draw() self.fig.canvas.draw()
self.fig.canvas.flush_events() self.fig.canvas.flush_events()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self): if __name__ == "__main__":
del self.fig env = SimpleReacherEnv(5)
env.reset()
for i in range(200):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
@property env.render()
def end_effector(self): if done:
return self._joints[self.n_links].T break

View File

@ -6,17 +6,15 @@ import numpy as np
from gym.utils import seeding from gym.utils import seeding
from alr_envs.alr.classic_control.utils import check_self_collision from alr_envs.alr.classic_control.utils import check_self_collision
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
class ViaPointReacherEnv(gym.Env): class ViaPointReacherEnv(BaseReacherDirectEnv):
def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None, def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None,
target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000): target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000):
self.n_links = n_links super().__init__(n_links, random_start, allow_self_collision)
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters # provided initial parameters
self.intitial_target = target # provided target value self.intitial_target = target # provided target value
@ -30,17 +28,6 @@ class ViaPointReacherEnv(gym.Env):
self.allow_self_collision = allow_self_collision self.allow_self_collision = allow_self_collision
self.collision_penalty = collision_penalty self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self.weight_matrix_scale = 1
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -49,69 +36,15 @@ class ViaPointReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps [np.inf] # env steps, because reward start after n steps
]) ])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self):
return self._dt
# @property # @property
# def start_pos(self): # def start_pos(self):
# return self._start_pos # return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
a single step with an action in joint velocity space
"""
vel = action
self._angle_velocity = vel
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
acc = (vel - self._angle_velocity) / self.dt
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self): def reset(self):
if self.random_start:
# Maybe change more than dirst seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal() self._generate_goal()
return super().reset()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def _generate_goal(self): def _generate_goal(self):
# TODO: Maybe improve this later, this can yield quite a lot of invalid settings # TODO: Maybe improve this later, this can yield quite a lot of invalid settings
@ -137,29 +70,13 @@ class ViaPointReacherEnv(gym.Env):
self._via_point = via_target self._via_point = via_target
self._goal = goal self._goal = goal
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi):
self_collision = True
self._is_collided = self_collision
def _get_reward(self, acc): def _get_reward(self, acc):
success = False success = False
reward = -np.inf reward = -np.inf
if not self.allow_self_collision:
self._is_collided = self._check_self_collision()
if not self._is_collided: if not self._is_collided:
dist = np.inf dist = np.inf
# return intermediate reward for via point # return intermediate reward for via point
@ -177,10 +94,15 @@ class ViaPointReacherEnv(gym.Env):
reward -= dist ** 2 reward -= dist ** 2
reward -= 5e-8 * np.sum(acc ** 2) reward -= 5e-8 * np.sum(acc ** 2)
info = {"is_success": success} info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
return reward, info return reward, info
def _terminate(self, info):
return info["is_collided"]
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
return np.hstack([ return np.hstack([
@ -192,27 +114,6 @@ class ViaPointReacherEnv(gym.Env):
self._steps self._steps
]) ])
def get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
endeffector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
endeffector[0, :, 0] = x[0, :]
endeffector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0]
endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1]
return np.squeeze(endeffector + self._joints[0, :])
def render(self, mode='human'): def render(self, mode='human'):
goal_pos = self._goal.T goal_pos = self._goal.T
via_pos = self._via_point.T via_pos = self._via_point.T
@ -281,14 +182,14 @@ class ViaPointReacherEnv(gym.Env):
plt.pause(0.01) plt.pause(0.01)
def seed(self, seed=None): if __name__ == "__main__":
self.np_random, seed = seeding.np_random(seed) import time
return [seed] env = ViaPointReacherEnv(5)
env.reset()
@property for i in range(10000):
def end_effector(self): ac = env.action_space.sample()
return self._joints[self.n_links].T obs, rew, done, info = env.step(ac)
env.render()
def close(self): if done:
if self.fig is not None: env.reset()
plt.close(self.fig)

View File

@ -149,16 +149,16 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
if __name__ == '__main__': if __name__ == '__main__':
render = False render = False
# DMP # DMP
# example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render) example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render)
# ProMP # ProMP
# example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render) example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render)
# DetProMP # DetProMP
example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=100, render=render) example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=100, render=render)
# Altered basis functions # Altered basis functions
# example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render)
# Custom MP # Custom MP
# example_fully_custom_mp(seed=10, iterations=1, render=render) example_fully_custom_mp(seed=10, iterations=1, render=render)