reacher envs cleanup

- base classes for direct and torque control
- added promp wrapper support
This commit is contained in:
Maximilian Huettenrauch 2021-11-26 16:31:46 +01:00
parent 95560c2bab
commit 386c418778
11 changed files with 645 additions and 821 deletions

View File

@ -84,7 +84,7 @@ register(
register(
id='HoleReacher-v1',
entry_point='alr_envs.alr.classic_control:HoleReacherEnvOld',
entry_point='alr_envs.alr.classic_control:HoleReacherEnv',
max_episode_steps=200,
kwargs={
"n_links": 5,
@ -110,7 +110,7 @@ register(
"hole_width": 0.25,
"hole_depth": 1,
"hole_x": 2,
"collision_penalty": 100,
"collision_penalty": 1,
}
)
@ -247,6 +247,25 @@ for _v in _versions:
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id)
_env_id = f'{_name[0]}ProMP-{_name[1]}'
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:{_v}",
"wrappers": [classic_control.simple_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 2 if "long" not in _v.lower() else 5,
"num_basis": 5,
"duration": 2,
"policy_type": "motor",
"weights_scale": 1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
_env_id = f'{_name[0]}DetPMP-{_name[1]}'
register(
id=_env_id,
@ -289,6 +308,24 @@ register(
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0")
register(
id="ViaPointReacherProMP-v0",
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:ViaPointReacher-v0",
"wrappers": [classic_control.viapoint_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 5,
"num_basis": 5,
"duration": 2,
"policy_type": "motor",
"weights_scale": 1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ViaPointReacherProMP-v0")
register(
id='ViaPointReacherDetPMP-v0',
entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper',

View File

@ -0,0 +1,141 @@
from typing import Iterable, Union
from abc import ABCMeta, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.utils import intersect
class BaseReacherEnv(gym.Env):
"""
Base class for all reaching environments.
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._acc = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
# joint limits
self.j_min = -np.pi * np.ones(n_links)
self.j_max = np.pi * np.ones(n_links)
self.steps_before_reward = 199
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
self.reward_function = None # Needs to be set in sub class
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
@abstractmethod
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
raise NotImplementedError
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _check_self_collision(self):
"""Checks whether line segments intersect"""
if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min):
return True
link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1)
for i, line1 in enumerate(link_lines):
for line2 in link_lines[i + 2:, :]:
if intersect(line1[0], line1[-1], line2[0], line2[-1]):
return True
return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
@abstractmethod
def _terminate(self, info) -> bool:
return False
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -1,157 +1,35 @@
from typing import Iterable, Union
from abc import ABCMeta, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.utils import intersect
import numpy as np
from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherEnv(gym.Env):
class BaseReacherDirectEnv(BaseReacherEnv):
"""
Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory.
Base class for directly controlled reaching environments
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
# joint limits
self.j_min = -np.pi * np.ones(n_links)
self.j_max = np.pi * np.ones(n_links)
self.max_vel = 1
self.steps_before_reward = 199
super().__init__(n_links, random_start, allow_self_collision)
self.max_vel = 10 * np.pi
action_bound = np.ones((self.n_links,)) * self.max_vel
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
self.reward_function = None # Needs to be set in sub class
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
acc = (action - self._angle_velocity) / self.dt
self._acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
self._is_collided = self._check_collisions()
# reward, info = self._get_reward(action)
reward, info = self.reward_function.get_reward(self, acc)
reward, info = self._get_reward(action)
self._steps += 1
done = self._terminate(info)
return self._get_obs().copy(), reward, done, info
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _check_self_collision(self):
"""Checks whether line segments intersect"""
if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min):
return True
link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1)
for i, line1 in enumerate(link_lines):
for line2 in link_lines[i + 2:, :]:
if intersect(line1[0], line1[-1], line2[0], line2[-1]):
return True
return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
@abstractmethod
def _terminate(self, info) -> bool:
return False
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -1,86 +1,19 @@
from typing import Iterable, Union
from abc import ABCMeta, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.utils import seeding
from alr_envs.classic_control.utils import check_self_collision
import numpy as np
from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherEnv(gym.Env):
class BaseReacherTorqueEnv(BaseReacherEnv):
"""
Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory.
Base class for torque controlled reaching environments
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False, collision_penalty: float = 1000):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._is_collided = False
self.allow_self_collision = allow_self_collision
self.collision_penalty = collision_penalty
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self.max_torque = 1
self.steps_before_reward = 199
allow_self_collision: bool = False):
super().__init__(n_links, random_start, allow_self_collision)
self.max_torque = 1000
action_bound = np.ones((self.n_links,)) * self.max_torque
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def step(self, action: np.ndarray):
"""
@ -94,11 +27,6 @@ class BaseReacherEnv(gym.Env):
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
self._is_collided = self._check_collisions()
reward, info = self._get_reward(action)
@ -107,36 +35,3 @@ class BaseReacherEnv(gym.Env):
done = False
return self._get_obs().copy(), reward, done, info
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -3,14 +3,12 @@ from typing import Union
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym.utils import seeding
from matplotlib import patches
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherEnv
from alr_envs.alr.classic_control.utils import check_self_collision
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
class HoleReacherEnv(BaseReacherEnv):
class HoleReacherEnv(BaseReacherDirectEnv):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"):
@ -28,7 +26,7 @@ class HoleReacherEnv(BaseReacherEnv):
self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
action_bound = np.pi * np.ones((self.n_links,))
# action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
@ -38,19 +36,25 @@ class HoleReacherEnv(BaseReacherEnv):
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
# self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
if rew_fct == "simple":
from alr_envs.alr.classic_control.hole_reacher.simple_reward import HolereacherSimpleReward
self.reward_function = HolereacherSimpleReward(allow_self_collision, allow_wall_collision, collision_penalty)
from alr_envs.alr.classic_control.hole_reacher.hr_simple_reward import HolereacherReward
self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty)
else:
raise ValueError("Unknown reward function {}".format(rew_fct))
def reset(self):
self._generate_hole()
self._set_patches()
self.reward_function.reset()
return super().reset()
def _get_reward(self, action: np.ndarray) -> (float, dict):
return self.reward_function.get_reward(self)
def _terminate(self, info):
return info["is_collided"]
@ -118,10 +122,6 @@ class HoleReacherEnv(BaseReacherEnv):
end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0]
end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1]
# xy = np.stack((x, y), axis=2)
#
# self._joints[0] + np.cumsum(xy, axis=0)
return np.squeeze(end_effector + self._joints[0, :])
def check_wall_collision(self):
@ -157,36 +157,6 @@ class HoleReacherEnv(BaseReacherEnv):
return False
# def check_wall_collision(self, ):
# """find the intersection of line segments A=(x1,y1)/(x2,y2) and
# B=(x3,y3)/(x4,y4). """
#
# link_lines = np.hstack((self._joints[:-1, :], self._joints[1:, :]))
#
# all_points_product = np.hstack(
# [np.repeat(link_lines, len(self.ground_lines), axis=0),
# np.tile(self.ground_lines, (len(link_lines), 1))])
#
# x1, y1, x2, y2, x3, y3, x4, y4 = all_points_product.T
#
# denom = ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4))
# # if denom == 0:
# # return False
# px = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denom
# py = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denom
# # if (px - x1) * (px - x2) < 0 and (py - y1) * (py - y2) < 0 \
# # and (px - x3) * (px - x4) < 0 and (py - y3) * (py - y4) < 0:
# # return True # [px, py]
# test = ((px - x1) * (px - x2) <= 0) & ((py - y1) * (py - y2) <= 0) & ((px - x3) * (px - x4) <= 0) & (
# (py - y3) * (py - y4) <= 0)
# if np.any(test):
# possible_collisions = np.stack((px, py)).T[test]
# for row in possible_collisions:
# if not np.any([np.allclose(row, x) for x in self._joints]):
# return True, row
#
# return False, None
def render(self, mode='human'):
if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
@ -242,322 +212,14 @@ class HoleReacherEnv(BaseReacherEnv):
self.fig.gca().add_patch(hole_floor)
class HoleReacherEnvOld(gym.Env):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000):
self.n_links = n_links
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters
self.initial_x = hole_x # x-position of center of hole
self.initial_width = hole_width # width of hole
self.initial_depth = hole_depth # depth of hole
# temp container for current env state
self._tmp_x = None
self._tmp_width = None
self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf], # hole width
# [np.inf], # hole depth
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
A single step with an action in joint velocity space
"""
acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5)
self._update_joints()
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self.end_effector_traj.append(np.copy(self.end_effector))
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self):
if self.random_start:
# Maybe change more than first seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_hole()
self._set_patches()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
self.end_effector_traj = []
return self._get_obs().copy()
def _generate_hole(self):
if self.initial_width is None:
width = self.np_random.uniform(0.15, 0.5)
else:
width = np.copy(self.initial_width)
if self.initial_x is None:
# sample whole on left or right side
direction = self.np_random.choice([-1, 1])
# Hole center needs to be half the width away from the arm to give a valid setting.
x = direction * self.np_random.uniform(width / 2, 3.5)
else:
x = np.copy(self.initial_x)
if self.initial_depth is None:
# TODO we do not want this right now.
depth = self.np_random.uniform(1, 1)
else:
depth = np.copy(self.initial_depth)
self._tmp_width = width
self._tmp_x = x
self._tmp_depth = depth
self._goal = np.hstack([self._tmp_x, -self._tmp_depth])
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
if not self.allow_wall_collision:
wall_collision = self._check_wall_collision(line_points_in_taskspace)
self._is_collided = self_collision or wall_collision
def _get_reward(self, acc: np.ndarray):
reward = 0
# success = False
if self._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
# success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
reward -= 5e-8 * np.sum(acc ** 2)
# info = {"is_success": success}
return reward, {} # info
def _get_obs(self):
theta = self._joint_angles
return np.hstack([
np.cos(theta),
np.sin(theta),
self._angle_velocity,
self._tmp_width,
# self._tmp_hole_depth,
self.end_effector - self._goal,
self._steps
])
def _get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
end_effector[0, :, 0] = x[0, :]
end_effector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0]
end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1]
return np.squeeze(end_effector + self._joints[0, :])
def _check_wall_collision(self, line_points):
# all points that are before the hole in x
r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_before_hole > 0:
return True
# all points that are after the hole in x
r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_after_hole > 0:
return True
# all points that are above the hole
r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & (
line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2)))
# check if any of those points are below surface
nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth)
if nr_line_points_below_surface_in_hole > 0:
return True
return False
def render(self, mode='human'):
if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion()
self.fig = plt.figure()
ax = self.fig.add_subplot(1, 1, 1)
# limits
lim = np.sum(self.link_lengths) + 0.5
ax.set_xlim([-lim, lim])
ax.set_ylim([-1.1, lim])
self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
self._set_patches()
self.fig.show()
self.fig.gca().set_title(
f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}")
if mode == "human":
# arm
self.line.set_data(self._joints[:, 0], self._joints[:, 1])
self.fig.canvas.draw()
self.fig.canvas.flush_events()
elif mode == "partial":
if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided:
# Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k',
alpha=self._steps / 200)
def _set_patches(self):
if self.fig is not None:
self.fig.gca().patches = []
left_block = patches.Rectangle((-self.n_links, -self._tmp_depth),
self.n_links + self._tmp_x - self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth),
self.n_links - self._tmp_x + self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth),
self._tmp_width,
1 - self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
# Add the patch to the Axes
self.fig.gca().add_patch(left_block)
self.fig.gca().add_patch(right_block)
self.fig.gca().add_patch(hole_floor)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self):
super().close()
if self.fig is not None:
plt.close(self.fig)
if __name__ == "__main__":
import time
env = HoleReacherEnv(5)
env.reset()
start = time.time()
for i in range(10000):
# env.check_wall_collision()
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
# env.render()
if done:
env.reset()
print(time.time() - start)

View File

@ -0,0 +1,315 @@
from typing import Union
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym.utils import seeding
from matplotlib import patches
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
from alr_envs.alr.classic_control.utils import check_self_collision
class HoleReacherEnvOld(gym.Env):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000):
self.n_links = n_links
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters
self.initial_x = hole_x # x-position of center of hole
self.initial_width = hole_width # width of hole
self.initial_depth = hole_depth # depth of hole
# temp container for current env state
self._tmp_x = None
self._tmp_width = None
self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf], # hole width
# [np.inf], # hole depth
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
A single step with an action in joint velocity space
"""
acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5)
self._update_joints()
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self.end_effector_traj.append(np.copy(self.end_effector))
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self):
if self.random_start:
# Maybe change more than first seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_hole()
self._set_patches()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
self.end_effector_traj = []
return self._get_obs().copy()
def _generate_hole(self):
if self.initial_width is None:
width = self.np_random.uniform(0.15, 0.5)
else:
width = np.copy(self.initial_width)
if self.initial_x is None:
# sample whole on left or right side
direction = self.np_random.choice([-1, 1])
# Hole center needs to be half the width away from the arm to give a valid setting.
x = direction * self.np_random.uniform(width / 2, 3.5)
else:
x = np.copy(self.initial_x)
if self.initial_depth is None:
# TODO we do not want this right now.
depth = self.np_random.uniform(1, 1)
else:
depth = np.copy(self.initial_depth)
self._tmp_width = width
self._tmp_x = x
self._tmp_depth = depth
self._goal = np.hstack([self._tmp_x, -self._tmp_depth])
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
if not self.allow_wall_collision:
wall_collision = self._check_wall_collision(line_points_in_taskspace)
self._is_collided = self_collision or wall_collision
def _get_reward(self, acc: np.ndarray):
reward = 0
# success = False
if self._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
# success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
reward -= 5e-8 * np.sum(acc ** 2)
# info = {"is_success": success}
return reward, {} # info
def _get_obs(self):
theta = self._joint_angles
return np.hstack([
np.cos(theta),
np.sin(theta),
self._angle_velocity,
self._tmp_width,
# self._tmp_hole_depth,
self.end_effector - self._goal,
self._steps
])
def _get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
end_effector[0, :, 0] = x[0, :]
end_effector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
end_effector[i, :, 0] = x[i, :] + end_effector[i - 1, -1, 0]
end_effector[i, :, 1] = y[i, :] + end_effector[i - 1, -1, 1]
return np.squeeze(end_effector + self._joints[0, :])
def _check_wall_collision(self, line_points):
# all points that are before the hole in x
r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_before_hole > 0:
return True
# all points that are after the hole in x
r, c = np.where(line_points[:, :, 0] > (self._tmp_x + self._tmp_width / 2))
# check if any of those points are below surface
nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0)
if nr_line_points_below_surface_after_hole > 0:
return True
# all points that are above the hole
r, c = np.where((line_points[:, :, 0] > (self._tmp_x - self._tmp_width / 2)) & (
line_points[:, :, 0] < (self._tmp_x + self._tmp_width / 2)))
# check if any of those points are below surface
nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self._tmp_depth)
if nr_line_points_below_surface_in_hole > 0:
return True
return False
def render(self, mode='human'):
if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion()
self.fig = plt.figure()
ax = self.fig.add_subplot(1, 1, 1)
# limits
lim = np.sum(self.link_lengths) + 0.5
ax.set_xlim([-lim, lim])
ax.set_ylim([-1.1, lim])
self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
self._set_patches()
self.fig.show()
self.fig.gca().set_title(
f"Iteration: {self._steps}, distance: {np.linalg.norm(self.end_effector - self._goal) ** 2}")
if mode == "human":
# arm
self.line.set_data(self._joints[:, 0], self._joints[:, 1])
self.fig.canvas.draw()
self.fig.canvas.flush_events()
elif mode == "partial":
if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided:
# Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k',
alpha=self._steps / 200)
def _set_patches(self):
if self.fig is not None:
self.fig.gca().patches = []
left_block = patches.Rectangle((-self.n_links, -self._tmp_depth),
self.n_links + self._tmp_x - self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
right_block = patches.Rectangle((self._tmp_x + self._tmp_width / 2, -self._tmp_depth),
self.n_links - self._tmp_x + self._tmp_width / 2,
self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
hole_floor = patches.Rectangle((self._tmp_x - self._tmp_width / 2, -self._tmp_depth),
self._tmp_width,
1 - self._tmp_depth,
fill=True, edgecolor='k', facecolor='k')
# Add the patch to the Axes
self.fig.gca().add_patch(left_block)
self.fig.gca().add_patch(right_block)
self.fig.gca().add_patch(hole_floor)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self):
super().close()
if self.fig is not None:
plt.close(self.fig)

View File

@ -0,0 +1,56 @@
import numpy as np
class HolereacherReward:
def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty):
self.collision_penalty = collision_penalty
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
self._is_collided = False
self.reward_factors = np.array((-1, -1e-4, -1e-6, -collision_penalty, -1))
def reset(self):
self._is_collided = False
def get_reward(self, env):
dist_cost = 0
collision_cost = 0
time_cost = 0
success = False
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = env._check_self_collision()
if not self.allow_wall_collision:
wall_collision = env.check_wall_collision()
self._is_collided = self_collision or wall_collision
if env._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(env.end_effector - env._goal)
success = dist < 0.005 and not self._is_collided
dist_cost = int(not self._is_collided) * dist ** 2
collision_cost = self._is_collided * dist ** 2
time_cost = 199 - env._steps
info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
vel_cost = np.sum(env._angle_velocity ** 2)
acc_cost = np.sum(env._acc ** 2)
reward_features = np.array((dist_cost, vel_cost, acc_cost, collision_cost, time_cost))
reward = np.dot(reward_features, self.reward_factors)
return reward, info

View File

@ -1,8 +1,7 @@
import numpy as np
from alr_envs.alr.classic_control.utils import check_self_collision
class HolereacherSimpleReward:
class HolereacherReward:
def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty):
self.collision_penalty = collision_penalty
@ -11,16 +10,20 @@ class HolereacherSimpleReward:
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
self._is_collided = False
pass
def get_reward(self, env, action):
reward = 0
self.reward_factors = np.array((-1, -5e-8, -collision_penalty))
def reset(self):
self._is_collided = False
def get_reward(self, env):
dist_cost = 0
collision_cost = 0
success = False
self_collision = False
wall_collision = False
# joints = np.hstack((env._joints[:-1, :], env._joints[1:, :]))
if not self.allow_self_collision:
self_collision = env._check_self_collision()
@ -33,16 +36,18 @@ class HolereacherSimpleReward:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(env.end_effector - env._goal)
dist_cost = dist ** 2
collision_cost = int(self._is_collided)
success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
info = {"is_success": success,
"is_collided": self._is_collided}
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
acc = (action - env._angle_velocity) / env.dt
reward -= 5e-8 * np.sum(acc ** 2)
acc_cost = np.sum(env._acc ** 2)
reward_features = np.array((dist_cost, acc_cost, collision_cost))
reward = np.dot(reward_features, self.reward_factors)
return reward, info

View File

@ -1,26 +1,22 @@
from typing import Iterable, Union
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv
class SimpleReacherEnv(gym.Env):
class SimpleReacherEnv(BaseReacherTorqueEnv):
"""
Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory.
"""
def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.1
self.random_start = random_start
def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True,
allow_self_collision: bool = False,):
super().__init__(n_links, random_start, allow_self_collision)
# provided initial parameters
self.inital_target = target
@ -28,16 +24,10 @@ class SimpleReacherEnv(gym.Env):
# temp container for current env state
self._goal = None
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.zeros(self.n_links)
self._start_vel = np.zeros(self.n_links)
self.max_torque = 1
self.steps_before_reward = 199
action_bound = np.ones((self.n_links,)) * self.max_torque
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
@ -45,84 +35,24 @@ class SimpleReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles
@property
def current_vel(self):
return self._angle_velocity
def step(self, action: np.ndarray):
"""
A single step with action in torque space
"""
# action = self._add_action_noise(action)
ac = np.clip(action, -self.max_torque, self.max_torque)
self._angle_velocity = self._angle_velocity + self.dt * ac
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
reward, info = self._get_reward(action)
self._steps += 1
done = False
return self._get_obs().copy(), reward, done, info
def reset(self):
# TODO: maybe do initialisation more random?
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
self._joint_angles = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
return super().reset()
def _get_reward(self, action: np.ndarray):
diff = self.end_effector - self._goal
reward_dist = 0
if not self.allow_self_collision:
self._is_collided = self._check_self_collision()
if self._steps >= self.steps_before_reward:
reward_dist -= np.linalg.norm(diff)
# reward_dist = np.exp(-0.1 * diff ** 2).mean()
@ -132,6 +62,9 @@ class SimpleReacherEnv(gym.Env):
reward = reward_dist - reward_ctrl
return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl)
def _terminate(self, info):
return False
def _get_obs(self):
theta = self._joint_angles
return np.hstack([
@ -190,13 +123,14 @@ class SimpleReacherEnv(gym.Env):
self.fig.canvas.draw()
self.fig.canvas.flush_events()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
if __name__ == "__main__":
env = SimpleReacherEnv(5)
env.reset()
for i in range(200):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
@property
def end_effector(self):
return self._joints[self.n_links].T
env.render()
if done:
break

View File

@ -6,17 +6,15 @@ import numpy as np
from gym.utils import seeding
from alr_envs.alr.classic_control.utils import check_self_collision
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
class ViaPointReacherEnv(gym.Env):
class ViaPointReacherEnv(BaseReacherDirectEnv):
def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None,
target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000):
self.n_links = n_links
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
super().__init__(n_links, random_start, allow_self_collision)
# provided initial parameters
self.intitial_target = target # provided target value
@ -30,17 +28,6 @@ class ViaPointReacherEnv(gym.Env):
self.allow_self_collision = allow_self_collision
self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self.weight_matrix_scale = 1
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
@ -49,69 +36,15 @@ class ViaPointReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps
])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self):
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
a single step with an action in joint velocity space
"""
vel = action
self._angle_velocity = vel
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
acc = (vel - self._angle_velocity) / self.dt
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self):
if self.random_start:
# Maybe change more than dirst seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
return super().reset()
def _generate_goal(self):
# TODO: Maybe improve this later, this can yield quite a lot of invalid settings
@ -137,29 +70,13 @@ class ViaPointReacherEnv(gym.Env):
self._via_point = via_target
self._goal = goal
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi):
self_collision = True
self._is_collided = self_collision
def _get_reward(self, acc):
success = False
reward = -np.inf
if not self.allow_self_collision:
self._is_collided = self._check_self_collision()
if not self._is_collided:
dist = np.inf
# return intermediate reward for via point
@ -177,10 +94,15 @@ class ViaPointReacherEnv(gym.Env):
reward -= dist ** 2
reward -= 5e-8 * np.sum(acc ** 2)
info = {"is_success": success}
info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
return reward, info
def _terminate(self, info):
return info["is_collided"]
def _get_obs(self):
theta = self._joint_angles
return np.hstack([
@ -192,27 +114,6 @@ class ViaPointReacherEnv(gym.Env):
self._steps
])
def get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
endeffector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
endeffector[0, :, 0] = x[0, :]
endeffector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0]
endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1]
return np.squeeze(endeffector + self._joints[0, :])
def render(self, mode='human'):
goal_pos = self._goal.T
via_pos = self._via_point.T
@ -281,14 +182,14 @@ class ViaPointReacherEnv(gym.Env):
plt.pause(0.01)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
if __name__ == "__main__":
import time
env = ViaPointReacherEnv(5)
env.reset()
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self):
if self.fig is not None:
plt.close(self.fig)
for i in range(10000):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
env.render()
if done:
env.reset()

View File

@ -149,16 +149,16 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
if __name__ == '__main__':
render = False
# DMP
# example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render)
example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=10, render=render)
# ProMP
# example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render)
example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=100, render=render)
# DetProMP
example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=100, render=render)
# Altered basis functions
# example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render)
example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render)
# Custom MP
# example_fully_custom_mp(seed=10, iterations=1, render=render)
example_fully_custom_mp(seed=10, iterations=1, render=render)