adjusted classic control environments to new interface

This commit is contained in:
ottofabian 2021-05-12 17:48:57 +02:00
parent 95e9b8be47
commit 6ae195962c
14 changed files with 535 additions and 489 deletions

View File

@ -1,6 +1,7 @@
from gym.envs.registration import register from gym.envs.registration import register
from alr_envs.stochastic_search.functions.f_rosenbrock import Rosenbrock from alr_envs.stochastic_search.functions.f_rosenbrock import Rosenbrock
# from alr_envs.utils.mps.dmp_wrapper import DmpWrapper # from alr_envs.utils.mps.dmp_wrapper import DmpWrapper
# Mujoco # Mujoco
@ -71,6 +72,17 @@ register(
} }
) )
## Balancing Reacher
register(
id='Balancing-v0',
entry_point='alr_envs.mujoco:BalancingEnv',
max_episode_steps=200,
kwargs={
"n_links": 5,
}
)
register( register(
id='ALRBallInACupSimple-v0', id='ALRBallInACupSimple-v0',
entry_point='alr_envs.mujoco:ALRBallInACupEnv', entry_point='alr_envs.mujoco:ALRBallInACupEnv',
@ -101,15 +113,7 @@ register(
# Classic control # Classic control
register( ## Simple Reacher
id='Balancing-v0',
entry_point='alr_envs.mujoco:BalancingEnv',
max_episode_steps=200,
kwargs={
"n_links": 5,
}
)
register( register(
id='SimpleReacher-v0', id='SimpleReacher-v0',
entry_point='alr_envs.classic_control:SimpleReacherEnv', entry_point='alr_envs.classic_control:SimpleReacherEnv',
@ -129,25 +133,6 @@ register(
} }
) )
register(
id='EpisodicSimpleReacher-v0',
entry_point='alr_envs.classic_control:EpisodicSimpleReacherEnv',
max_episode_steps=200,
kwargs={
"n_links": 2,
}
)
register(
id='EpisodicSimpleReacher-v1',
entry_point='alr_envs.classic_control:EpisodicSimpleReacherEnv',
max_episode_steps=200,
kwargs={
"n_links": 2,
"random_start": False
}
)
register( register(
id='LongSimpleReacher-v0', id='LongSimpleReacher-v0',
entry_point='alr_envs.classic_control:SimpleReacherEnv', entry_point='alr_envs.classic_control:SimpleReacherEnv',
@ -157,6 +142,18 @@ register(
} }
) )
register(
id='LongSimpleReacher-v1',
entry_point='alr_envs.classic_control:SimpleReacherEnv',
max_episode_steps=200,
kwargs={
"n_links": 5,
"random_start": False
}
)
## Viapoint Reacher
register( register(
id='ViaPointReacher-v0', id='ViaPointReacher-v0',
entry_point='alr_envs.classic_control.viapoint_reacher:ViaPointReacher', entry_point='alr_envs.classic_control.viapoint_reacher:ViaPointReacher',
@ -168,27 +165,45 @@ register(
} }
) )
## Hole Reacher
register( register(
id='HoleReacher-v0', id='HoleReacher-v0',
entry_point='alr_envs.classic_control.hole_reacher:HoleReacher', entry_point='alr_envs.classic_control.hole_reacher:HoleReacherEnv',
max_episode_steps=200, max_episode_steps=200,
kwargs={ kwargs={
"n_links": 5, "n_links": 5,
"allow_self_collision": False, "allow_self_collision": False,
"allow_wall_collision": False, "allow_wall_collision": False,
"hole_width": 0.25, "hole_width": None,
"hole_depth": 1, "hole_depth": 1,
"hole_x": 2, "hole_x": None,
"collision_penalty": 100,
}
)
register(
id='HoleReacher-v1',
entry_point='alr_envs.classic_control.hole_reacher:HoleReacherEnv',
max_episode_steps=200,
kwargs={
"n_links": 5,
"random_start": False,
"allow_self_collision": False,
"allow_wall_collision": False,
"hole_width": None,
"hole_depth": 1,
"hole_x": None,
"collision_penalty": 100, "collision_penalty": 100,
} }
) )
register( register(
id='HoleReacher-v2', id='HoleReacher-v2',
entry_point='alr_envs.classic_control.hole_reacher_v2:HoleReacher', entry_point='alr_envs.classic_control.hole_reacher:HoleReacherEnv',
max_episode_steps=200, max_episode_steps=200,
kwargs={ kwargs={
"n_links": 5, "n_links": 5,
"random_start": False,
"allow_self_collision": False, "allow_self_collision": False,
"allow_wall_collision": False, "allow_wall_collision": False,
"hole_width": 0.25, "hole_width": 0.25,
@ -199,30 +214,16 @@ register(
) )
# MP environments # MP environments
reacher_envs = ["SimpleReacher-v0", "SimpleReacher-v1", "LongSimpleReacher-v0", "LongSimpleReacher-v1"]
for env in reacher_envs:
name = env.split("-")
register( register(
id='SimpleReacherDMP-v0', id=f'{name[0]}DMP-{name[1]}',
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env', entry_point='alr_envs.utils.make_env_helpers:make_dmp_env',
# max_episode_steps=1, # max_episode_steps=1,
kwargs={ kwargs={
"name": "alr_envs:EpisodicSimpleReacher-v0", "name": f"alr_envs:{env}",
"num_dof": 2, "num_dof": 2 if "long" not in env.lower() else 5 ,
"num_basis": 5,
"duration": 2,
"alpha_phase": 2,
"learn_goal": True,
"policy_type": "velocity",
"weights_scale": 50,
}
)
register(
id='SimpleReacherDMP-v1',
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env',
# max_episode_steps=1,
kwargs={
"name": "alr_envs:EpisodicSimpleReacher-v1",
"num_dof": 2,
"num_basis": 5, "num_basis": 5,
"duration": 2, "duration": 2,
"alpha_phase": 2, "alpha_phase": 2,
@ -266,6 +267,24 @@ register(
} }
) )
register(
id='HoleReacherDMP-v1',
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env',
# max_episode_steps=1,
kwargs={
"name": "alr_envs:HoleReacher-v1",
"num_dof": 5,
"num_basis": 5,
"duration": 2,
"learn_goal": True,
"alpha_phase": 2,
"bandwidth_factor": 2,
"policy_type": "velocity",
"weights_scale": 50,
"goal_scale": 0.1
}
)
register( register(
id='HoleReacherDMP-v2', id='HoleReacherDMP-v2',
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env', entry_point='alr_envs.utils.make_env_helpers:make_dmp_env',

View File

@ -1,4 +1,4 @@
from alr_envs.classic_control.simple_reacher import SimpleReacherEnv from alr_envs.classic_control.simple_reacher import SimpleReacherEnv
from alr_envs.classic_control.episodic_simple_reacher import EpisodicSimpleReacherEnv from alr_envs.classic_control.episodic_simple_reacher import EpisodicSimpleReacherEnv
from alr_envs.classic_control.viapoint_reacher import ViaPointReacher from alr_envs.classic_control.viapoint_reacher import ViaPointReacher
from alr_envs.classic_control.hole_reacher import HoleReacher from alr_envs.classic_control.hole_reacher import HoleReacherEnv

View File

@ -35,7 +35,7 @@ class EpisodicSimpleReacherEnv(SimpleReacherEnv):
def _get_obs(self): def _get_obs(self):
if self.random_start: if self.random_start:
theta = self._joint_angle theta = self._joint_angles
return np.hstack([ return np.hstack([
np.cos(theta), np.cos(theta),
np.sin(theta), np.sin(theta),

View File

@ -10,11 +10,12 @@ from alr_envs.classic_control.utils import check_self_collision
from alr_envs.utils.mps.mp_environments import MPEnv from alr_envs.utils.mps.mp_environments import MPEnv
class HoleReacher(MPEnv): class HoleReacherEnv(MPEnv):
def __init__(self, n_links, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = True, allow_self_collision: bool = False, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: bool = 1000): allow_wall_collision: bool = False, collision_penalty: bool = 1000):
self.n_links = n_links self.n_links = n_links
self.link_lengths = np.ones((n_links, 1)) self.link_lengths = np.ones((n_links, 1))
@ -25,10 +26,11 @@ class HoleReacher(MPEnv):
self._hole_width = hole_width # width of hole self._hole_width = hole_width # width of hole
self._hole_depth = hole_depth # depth of hole self._hole_depth = hole_depth # depth of hole
# temp containers to store current setting # temp container for current env state
self._tmp_hole_x = None self._tmp_hole_x = None
self._tmp_hole_width = None self._tmp_hole_width = None
self._tmp_hole_depth = None self._tmp_hole_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
# collision # collision
self.allow_self_collision = allow_self_collision self.allow_self_collision = allow_self_collision
@ -36,14 +38,13 @@ class HoleReacher(MPEnv):
self.collision_penalty = collision_penalty self.collision_penalty = collision_penalty
# state # state
self._joints = None
self._joint_angles = None self._joint_angles = None
self._angle_velocity = None self._angle_velocity = None
self._joints = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links) self._start_vel = np.zeros(self.n_links)
self.dt = 0.01 self.dt = 0.01
# self.time_limit = 2
action_bound = np.pi * np.ones((self.n_links,)) action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
@ -58,54 +59,43 @@ class HoleReacher(MPEnv):
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
plt.ion() # containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None self.fig = None
self._steps = 0
self.seed() self.seed()
@property def step(self, action: np.ndarray):
def corrected_obs_index(self): """
return np.hstack([ A single step with an action in joint velocity space
[self.random_start] * self.n_links, # cos """
[self.random_start] * self.n_links, # sin
[self.random_start] * self.n_links, # velocity
[self._hole_width is None], # hole width
[self._hole_depth is None], # hole width
[True] * 2, # x-y coordinates of target distance
[False] # env steps
])
def seed(self, seed=None): self._angle_velocity = action
self.np_random, seed = seeding.np_random(seed) self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
return [seed] self._update_joints()
@property acc = (action - self._angle_velocity) / self.dt
def end_effector(self): reward, info = self._get_reward(acc)
return self._joints[self.n_links].T
def _generate_hole(self): info.update({"is_collided": self._is_collided})
hole_x = self.np_random.uniform(0.5, 3.5, 1) if self._hole_x is None else np.copy(self._hole_x)
hole_width = self.np_random.uniform(0.5, 0.1, 1) if self._hole_width is None else np.copy(self._hole_width)
# TODO we do not want this right now.
hole_depth = self.np_random.uniform(1, 1, 1) if self._hole_depth is None else np.copy(self._hole_depth)
self.bottom_center_of_hole = np.hstack([hole_x, -hole_depth]) self._steps += 1
self.top_center_of_hole = np.hstack([hole_x, 0]) done = self._is_collided
self.left_wall_edge = np.hstack([hole_x - hole_width / 2, 0])
self.right_wall_edge = np.hstack([hole_x + hole_width / 2, 0])
return hole_x, hole_width, hole_depth return self._get_obs().copy(), reward, done, info
def reset(self): def reset(self):
if self.random_start: if self.random_start:
# MAybe change more than dirst seed # Maybe change more than dirst seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else: else:
self._joint_angles = self._start_pos self._joint_angles = self._start_pos
self._tmp_hole_x, self._tmp_hole_width, self._tmp_hole_depth = self._generate_hole() self._generate_hole()
self.set_patches() self._set_patches()
self._angle_velocity = self._start_vel self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2)) self._joints = np.zeros((self.n_links + 1, 2))
@ -114,42 +104,14 @@ class HoleReacher(MPEnv):
return self._get_obs().copy() return self._get_obs().copy()
def step(self, action: np.ndarray): def _generate_hole(self):
""" self._tmp_hole_x = self.np_random.uniform(0.5, 3.5, 1) if self._hole_x is None else np.copy(self._hole_x)
a single step with an action in joint velocity space self._tmp_hole_width = self.np_random.uniform(0.5, 0.1, 1) if self._hole_width is None else np.copy(
""" self._hole_width)
vel = action # + 0.01 * np.random.randn(self.num_links) # TODO we do not want this right now.
acc = (vel - self._angle_velocity) / self.dt self._tmp_hole_depth = self.np_random.uniform(1, 1, 1) if self._hole_depth is None else np.copy(
self._angle_velocity = vel self._hole_depth)
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._goal = np.hstack([self._tmp_hole_x, -self._tmp_hole_depth])
self._update_joints()
# rew = self._reward()
# compute reward directly in step function
success = False
reward = 0
if not self._is_collided:
# return reward only in last time step
if self._steps == 199:
dist = np.linalg.norm(self.end_effector - self.bottom_center_of_hole)
reward = - dist ** 2
success = dist < 0.005
else:
# Episode terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self.bottom_center_of_hole)
reward = - dist ** 2 - self.collision_penalty
reward -= 5e-8 * np.sum(acc ** 2)
info = {"is_collided": self._is_collided, "is_success": success}
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def _update_joints(self): def _update_joints(self):
""" """
@ -157,7 +119,7 @@ class HoleReacher(MPEnv):
Returns: Returns:
""" """
line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20) line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
@ -171,23 +133,43 @@ class HoleReacher(MPEnv):
self_collision = True self_collision = True
if not self.allow_wall_collision: if not self.allow_wall_collision:
wall_collision = self.check_wall_collision(line_points_in_taskspace) wall_collision = self._check_wall_collision(line_points_in_taskspace)
self._is_collided = self_collision or wall_collision self._is_collided = self_collision or wall_collision
def _get_reward(self, acc: np.ndarray):
success = False
reward = -np.inf
if not self._is_collided:
dist = 0
# return reward only in last time step
if self._steps == 199:
dist = np.linalg.norm(self.end_effector - self._goal)
success = dist < 0.005
else:
# Episode terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
reward = -self.collision_penalty
reward -= dist ** 2
reward -= 5e-8 * np.sum(acc ** 2)
info = {"is_success": success}
return reward, info
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
return np.hstack([ return np.hstack([
np.cos(theta), np.cos(theta),
np.sin(theta), np.sin(theta),
self._angle_velocity, self._angle_velocity,
self._hole_width, self._tmp_hole_width,
self._hole_depth, self._tmp_hole_depth,
self.end_effector - self.bottom_center_of_hole, self.end_effector - self._goal,
self._steps self._steps
]) ])
def get_forward_kinematics(self, num_points_per_link=1): def _get_forward_kinematics(self, num_points_per_link=1):
theta = self._joint_angles[:, None] theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
@ -206,7 +188,7 @@ class HoleReacher(MPEnv):
return np.squeeze(end_effector + self._joints[0, :]) return np.squeeze(end_effector + self._joints[0, :])
def check_wall_collision(self, line_points): def _check_wall_collision(self, line_points):
# all points that are before the hole in x # all points that are before the hole in x
r, c = np.where(line_points[:, :, 0] < (self._tmp_hole_x - self._tmp_hole_width / 2)) r, c = np.where(line_points[:, :, 0] < (self._tmp_hole_x - self._tmp_hole_width / 2))
@ -240,6 +222,7 @@ class HoleReacher(MPEnv):
def render(self, mode='human'): def render(self, mode='human'):
if self.fig is None: if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion() plt.ion()
self.fig = plt.figure() self.fig = plt.figure()
ax = self.fig.add_subplot(1, 1, 1) ax = self.fig.add_subplot(1, 1, 1)
@ -250,74 +233,74 @@ class HoleReacher(MPEnv):
ax.set_ylim([-1.1, lim]) ax.set_ylim([-1.1, lim])
self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
self.set_patches() self._set_patches()
self.fig.show() self.fig.show()
if mode == "human":
self.fig.gca().set_title( self.fig.gca().set_title(
f"Iteration: {self._steps}, distance: {self.end_effector - self.bottom_center_of_hole}") f"Iteration: {self._steps}, distance: {self.end_effector - self._goal}")
# Arm if mode == "human":
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
# Arm # arm
self.line.set_xdata(self._joints[:, 0]) self.line.set_data(self._joints[:, 0], self._joints[:, 1])
self.line.set_ydata(self._joints[:, 1])
self.fig.canvas.draw() self.fig.canvas.draw()
self.fig.canvas.flush_events() self.fig.canvas.flush_events()
# self.fig.show()
elif mode == "partial": elif mode == "partial":
if self._steps == 1:
# fig, ax = plt.subplots()
# Add the patch to the Axes
[plt.gca().add_patch(rect) for rect in self.patches]
# plt.pause(0.01)
if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided: if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided:
# Arm # Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k', alpha=self._steps / 200) plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k',
# ax.plot(line_points_in_taskspace[:, 0, 0], alpha=self._steps / 200)
# line_points_in_taskspace[:, 0, 1],
# line_points_in_taskspace[:, -1, 0],
# line_points_in_taskspace[:, -1, 1], marker='o', color='k', alpha=t / 200)
lim = np.sum(self.link_lengths) + 0.5 def _set_patches(self):
plt.xlim([-lim, lim])
plt.ylim([-1.1, lim])
plt.pause(0.01)
elif mode == "final":
if self._steps == 199 or self._is_collided:
# fig, ax = plt.subplots()
# Add the patch to the Axes
[plt.gca().add_patch(rect) for rect in self.patches]
plt.xlim(-self.n_links, self.n_links), plt.ylim(-1, self.n_links)
# Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
plt.pause(0.01)
def set_patches(self):
if self.fig is not None: if self.fig is not None:
self.fig.gca().patches = [] self.fig.gca().patches = []
rect_1 = patches.Rectangle((-self.n_links, -1), self.n_links + self._tmp_hole_x - self._tmp_hole_width / 2, left_block = patches.Rectangle((-self.n_links, -self._tmp_hole_depth),
1, self.n_links + self._tmp_hole_x - self._tmp_hole_width / 2,
self._tmp_hole_depth,
fill=True, edgecolor='k', facecolor='k') fill=True, edgecolor='k', facecolor='k')
rect_2 = patches.Rectangle((self._tmp_hole_x + self._tmp_hole_width / 2, -1), right_block = patches.Rectangle((self._tmp_hole_x + self._tmp_hole_width / 2, -self._tmp_hole_depth),
self.n_links - self._tmp_hole_x + self._tmp_hole_width / 2, 1, self.n_links - self._tmp_hole_x + self._tmp_hole_width / 2,
self._tmp_hole_depth,
fill=True, edgecolor='k', facecolor='k') fill=True, edgecolor='k', facecolor='k')
rect_3 = patches.Rectangle((self._tmp_hole_x - self._tmp_hole_width / 2, -1), self._tmp_hole_width, hole_floor = patches.Rectangle((self._tmp_hole_x - self._tmp_hole_width / 2, -self._tmp_hole_depth),
self._tmp_hole_width,
1 - self._tmp_hole_depth, 1 - self._tmp_hole_depth,
fill=True, edgecolor='k', facecolor='k') fill=True, edgecolor='k', facecolor='k')
# Add the patch to the Axes # Add the patch to the Axes
self.fig.gca().add_patch(rect_1) self.fig.gca().add_patch(left_block)
self.fig.gca().add_patch(rect_2) self.fig.gca().add_patch(right_block)
self.fig.gca().add_patch(rect_3) self.fig.gca().add_patch(hole_floor)
@property
def active_obs(self):
return np.hstack([
[self.random_start] * self.n_links, # cos
[self.random_start] * self.n_links, # sin
[self.random_start] * self.n_links, # velocity
[self._hole_width is None], # hole width
[self._hole_depth is None], # hole width
[True] * 2, # x-y coordinates of target distance
[False] # env steps
])
@property
def start_pos(self) -> Union[float, int, np.ndarray]:
return self._start_pos
@property
def goal_pos(self) -> Union[float, int, np.ndarray]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self): def close(self):
if self.fig is not None: if self.fig is not None:
@ -327,19 +310,15 @@ class HoleReacher(MPEnv):
if __name__ == '__main__': if __name__ == '__main__':
nl = 5 nl = 5
render_mode = "human" # "human" or "partial" or "final" render_mode = "human" # "human" or "partial" or "final"
env = HoleReacher(n_links=nl, allow_self_collision=False, allow_wall_collision=False, hole_width=None, env = HoleReacherEnv(n_links=nl, allow_self_collision=False, allow_wall_collision=False, hole_width=None,
hole_depth=1, hole_x=None) hole_depth=1, hole_x=None)
env.reset() obs = env.reset()
# env.render(mode=render_mode)
for i in range(200): for i in range(200):
# objective.load_result("/tmp/cma") # objective.load_result("/tmp/cma")
# test with random actions # test with random actions
ac = 2 * env.action_space.sample() ac = 2 * env.action_space.sample()
# ac[0] += np.pi/2
obs, rew, d, info = env.step(ac) obs, rew, d, info = env.step(ac)
# if i % 1 == 0:
if i == 0:
env.render(mode=render_mode) env.render(mode=render_mode)
print(rew) print(rew)

View File

@ -1,24 +1,21 @@
import gym from typing import Iterable, Union
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym import spaces from gym import spaces
from gym.utils import seeding from gym.utils import seeding
from alr_envs.utils.utils import angle_normalize from alr_envs.utils.mps.mp_environments import MPEnv
# if os.environ.get("DISPLAY", None): class SimpleReacherEnv(MPEnv):
# mpl.use('Qt5Agg')
class SimpleReacherEnv(gym.Env):
""" """
Simple Reaching Task without any physics simulation. Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory. towards the end of the trajectory.
""" """
def __init__(self, n_links, random_start=True): def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True):
super().__init__() super().__init__()
self.link_lengths = np.ones(n_links) self.link_lengths = np.ones(n_links)
self.n_links = n_links self.n_links = n_links
@ -26,17 +23,19 @@ class SimpleReacherEnv(gym.Env):
self.random_start = random_start self.random_start = random_start
self._goal = None
self._joints = None self._joints = None
self._joint_angle = None self._joint_angles = None
self._angle_velocity = None self._angle_velocity = None
self._start_pos = None self._start_pos = np.zeros(self.n_links)
self._start_vel = np.zeros(self.n_links)
self.max_torque = 1 # 10 self._target = target # provided target value
self._goal = None # updated goal value, does not change when target != None
self.max_torque = 1
self.steps_before_reward = 199 self.steps_before_reward = 199
action_bound = np.ones((self.n_links,)) action_bound = np.ones((self.n_links,)) * self.max_torque
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -47,49 +46,50 @@ class SimpleReacherEnv(gym.Env):
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
self.fig = None # containers for plotting
self.metadata = {'render.modes': ["human"]} self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0 self._steps = 0
self.seed() self.seed()
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
"""
A single step with action in torque space
"""
# action = self._add_action_noise(action) # action = self._add_action_noise(action)
action = np.clip(action, -self.max_torque, self.max_torque) ac = np.clip(action, -self.max_torque, self.max_torque)
self._angle_velocity = self._angle_velocity + self.dt * action self._angle_velocity = self._angle_velocity + self.dt * ac
self._joint_angle = angle_normalize(self._joint_angle + self.dt * self._angle_velocity) self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints() self._update_joints()
self._steps += 1
reward, info = self._get_reward(action) reward, info = self._get_reward(action)
# done = np.abs(self.end_effector - self._goal_pos) < 0.1 self._steps += 1
done = False done = False
return self._get_obs().copy(), reward, done, info return self._get_obs().copy(), reward, done, info
def _add_action_noise(self, action: np.ndarray): def reset(self):
"""
add unobserved Gaussian Noise N(0,0.01) to the actions
Args:
action:
Returns: actions with noise # TODO: maybe do initialisation more random?
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
self._joint_angles = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
""" self._generate_goal()
return self.np_random.normal(0, 0.1, *action.shape) + action
def _get_obs(self): self._angle_velocity = self._start_vel
theta = self._joint_angle self._joints = np.zeros((self.n_links + 1, 2))
return np.hstack([ self._update_joints()
np.cos(theta), self._steps = 0
np.sin(theta),
self._angle_velocity, return self._get_obs().copy()
self.end_effector - self._goal,
self._steps
])
def _update_joints(self): def _update_joints(self):
""" """
@ -97,7 +97,7 @@ class SimpleReacherEnv(gym.Env):
Returns: Returns:
""" """
angles = np.cumsum(self._joint_angle) angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)]) x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0) self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
@ -105,7 +105,6 @@ class SimpleReacherEnv(gym.Env):
diff = self.end_effector - self._goal diff = self.end_effector - self._goal
reward_dist = 0 reward_dist = 0
# TODO: Is this the best option
if self._steps >= self.steps_before_reward: if self._steps >= self.steps_before_reward:
reward_dist -= np.linalg.norm(diff) reward_dist -= np.linalg.norm(diff)
# reward_dist = np.exp(-0.1 * diff ** 2).mean() # reward_dist = np.exp(-0.1 * diff ** 2).mean()
@ -115,67 +114,118 @@ class SimpleReacherEnv(gym.Env):
reward = reward_dist - reward_ctrl reward = reward_dist - reward_ctrl
return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl) return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl)
def reset(self): def _get_obs(self):
theta = self._joint_angles
return np.hstack([
np.cos(theta),
np.sin(theta),
self._angle_velocity,
self.end_effector - self._goal,
self._steps
])
# TODO: maybe do initialisation more random? def _generate_goal(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start: if self._target is None:
self._joint_angle = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)]) # center = self._joints[0]
# # Sample uniformly in circle with radius R around center of reacher.
# R = np.sum(self.link_lengths)
# r = R * np.sqrt(self.np_random.uniform())
# theta = self.np_random.uniform() * 2 * np.pi
# goal = center + r * np.stack([np.cos(theta), np.sin(theta)])
total_length = np.sum(self.link_lengths)
goal = np.array([total_length, total_length])
while np.linalg.norm(goal) >= total_length:
goal = self.np_random.uniform(low=-total_length, high=total_length, size=2)
else: else:
self._joint_angle = np.zeros(self.n_links) goal = np.copy(self._target)
self._start_pos = self._joint_angle self._goal = goal
self._angle_velocity = np.zeros(self.n_links)
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
self._goal = self._get_random_goal() def render(self, mode='human'): # pragma: no cover
return self._get_obs().copy() if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion()
self.fig = plt.figure()
ax = self.fig.add_subplot(1, 1, 1)
def _get_random_goal(self): # limits
center = self._joints[0] lim = np.sum(self.link_lengths) + 0.5
ax.set_xlim([-lim, lim])
ax.set_ylim([-lim, lim])
# Sample uniformly in circle with radius R around center of reacher. self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
R = np.sum(self.link_lengths) goal_pos = self._goal.T
r = R * np.sqrt(self.np_random.uniform()) self.goal_point, = ax.plot(goal_pos[0], goal_pos[1], 'gx')
theta = self.np_random.uniform() * 2 * np.pi self.goal_dist, = ax.plot([self.end_effector[0], goal_pos[0]], [self.end_effector[1], goal_pos[1]], 'g--')
return center + r * np.stack([np.cos(theta), np.sin(theta)])
self.fig.show()
self.fig.gca().set_title(f"Iteration: {self._steps}, distance: {self.end_effector - self._goal}")
# goal
goal_pos = self._goal.T
if self._steps == 1:
self.goal_point.set_data(goal_pos[0], goal_pos[1])
# arm
self.line.set_data(self._joints[:, 0], self._joints[:, 1])
# distance between end effector and goal
self.goal_dist.set_data([self.end_effector[0], goal_pos[0]], [self.end_effector[1], goal_pos[1]])
self.fig.canvas.draw()
self.fig.canvas.flush_events()
@property
def active_obs(self):
return np.hstack([
[self.random_start] * self.n_links, # cos
[self.random_start] * self.n_links, # sin
[self.random_start] * self.n_links, # velocity
[True] * 2, # x-y coordinates of target distance
[False] # env steps
])
@property
def start_pos(self):
return self._start_pos
@property
def goal_pos(self):
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
def seed(self, seed=None): def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed) self.np_random, seed = seeding.np_random(seed)
return [seed] return [seed]
def render(self, mode='human'): # pragma: no cover
if self.fig is None:
self.fig = plt.figure()
plt.ion()
plt.show()
else:
plt.figure(self.fig.number)
plt.cla()
plt.title(f"Iteration: {self._steps}, distance: {self.end_effector - self._goal}")
# Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
# goal
goal_pos = self._goal.T
plt.plot(goal_pos[0], goal_pos[1], 'gx')
# distance between end effector and goal
plt.plot([self.end_effector[0], goal_pos[0]], [self.end_effector[1], goal_pos[1]], 'g--')
lim = np.sum(self.link_lengths) + 0.5
plt.xlim([-lim, lim])
plt.ylim([-lim, lim])
# plt.draw()
# plt.pause(1e-4) pushes window to foreground, which is annoying.
self.fig.canvas.flush_events()
def close(self): def close(self):
del self.fig del self.fig
@property @property
def end_effector(self): def end_effector(self):
return self._joints[self.n_links].T return self._joints[self.n_links].T
if __name__ == '__main__':
nl = 5
render_mode = "human" # "human" or "partial" or "final"
env = SimpleReacherEnv(n_links=nl)
obs = env.reset()
print("First", obs)
for i in range(2000):
# objective.load_result("/tmp/cma")
# test with random actions
ac = 2 * env.action_space.sample()
# ac = np.ones(env.action_space.shape)
obs, rew, d, info = env.step(ac)
env.render(mode=render_mode)
print(obs[env.active_obs].shape)
if d or i % 200 == 0:
env.reset()
env.close()

View File

@ -1,19 +1,31 @@
from typing import Iterable, Union
import gym import gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym.utils import seeding
from alr_envs.classic_control.utils import check_self_collision from alr_envs.classic_control.utils import check_self_collision
from alr_envs.utils.mps.mp_environments import MPEnv
class ViaPointReacher(gym.Env): class ViaPointReacher(MPEnv):
def __init__(self, n_links, allow_self_collision=False, collision_penalty=1000): def __init__(self, n_links, random_start: bool = True, via_target: Union[None, Iterable] = None,
self.num_links = n_links target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000):
self.n_links = n_links
self.link_lengths = np.ones((n_links, 1)) self.link_lengths = np.ones((n_links, 1))
# task self.random_start = random_start
self.via_point = np.ones(2)
self.goal_point = np.array((n_links, 0)) # provided initial parameters
self._target = target # provided target value
self._via_target = via_target # provided via point target value
# temp container for current env state
self._via_point = np.ones(2)
self._goal = np.array((n_links, 0))
# collision # collision
self.allow_self_collision = allow_self_collision self.allow_self_collision = allow_self_collision
@ -23,78 +35,74 @@ class ViaPointReacher(gym.Env):
self._joints = None self._joints = None
self._joint_angles = None self._joint_angles = None
self._angle_velocity = None self._angle_velocity = None
self.start_pos = np.hstack([[np.pi / 2], np.zeros(self.num_links - 1)]) self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self.start_vel = np.zeros(self.num_links) self._start_vel = np.zeros(self.n_links)
self.weight_matrix_scale = 1 self.weight_matrix_scale = 1
self._steps = 0
self.dt = 0.01 self.dt = 0.01
# self.time_limit = 2
action_bound = np.pi * np.ones((self.num_links,)) action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.num_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.num_links, # sin [np.pi] * self.n_links, # sin
[np.inf] * self.num_links, # velocity [np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of via point distance
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe [np.inf] # env steps, because reward start after n steps
]) ])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None self.fig = None
@property
def end_effector(self):
return self._joints[self.num_links].T
def configure(self, context):
pass
def reset(self):
self._joint_angles = self.start_pos
self._angle_velocity = self.start_vel
self._joints = np.zeros((self.num_links + 1, 2))
self._update_joints()
self._steps = 0 self._steps = 0
self.seed()
return self._get_obs().copy()
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" """
a single step with an action in joint velocity space a single step with an action in joint velocity space
""" """
vel = action vel = action
acc = (vel - self._angle_velocity) / self.dt
self._angle_velocity = vel self._angle_velocity = vel
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints() self._update_joints()
dist_reward = 0 acc = (vel - self._angle_velocity) / self.dt
if not self._is_collided: reward, info = self._get_reward(acc)
if self._steps == 100:
dist_reward = np.linalg.norm(self.end_effector - self.via_point)
elif self._steps == 199:
dist_reward = np.linalg.norm(self.end_effector - self.goal_point)
# TODO: Do we need that? info.update({"is_collided": self._is_collided})
reward = - dist_reward ** 2
reward -= 5e-8 * np.sum(acc**2)
if self._is_collided:
reward -= self.collision_penalty
info = {"is_collided": self._is_collided}
self._steps += 1 self._steps += 1
# done = self._steps * self.dt > self.time_limit or self._is_collided
done = self._is_collided done = self._is_collided
return self._get_obs().copy(), reward, done, info return self._get_obs().copy(), reward, done, info
def reset(self):
if self.random_start:
# Maybe change more than dirst seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def _generate_goal(self):
self._via_point = self.np_random.uniform(0.5, 3.5, 2) if self._via_target is None else np.copy(self._via_target)
self._goal = self.np_random.uniform(0.5, 0.1, 2) if self._target is None else np.copy(self._target)
# raise NotImplementedError("How to properly sample points??")
def _update_joints(self): def _update_joints(self):
""" """
update _joints to get new end effector position. The other links are only required for rendering. update _joints to get new end effector position. The other links are only required for rendering.
@ -115,14 +123,38 @@ class ViaPointReacher(gym.Env):
self._is_collided = self_collision self._is_collided = self_collision
def _get_reward(self, acc):
success = False
reward = -np.inf
if not self._is_collided:
dist = np.inf
# return intermediate reward for via point
if self._steps == 100:
dist = np.linalg.norm(self.end_effector - self._via_point)
# return reward in last time step for goal
elif self._steps == 199:
dist = np.linalg.norm(self.end_effector - self._goal)
success = dist < 0.005
else:
# Episode terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
reward = -self.collision_penalty
reward -= dist ** 2
reward -= 5e-8 * np.sum(acc ** 2)
info = {"is_success": success}
return reward, info
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
return np.hstack([ return np.hstack([
np.cos(theta), np.cos(theta),
np.sin(theta), np.sin(theta),
self._angle_velocity, self._angle_velocity,
self.end_effector - self.via_point, self.end_effector - self._via_point,
self.end_effector - self.goal_point, self.end_effector - self._goal,
self._steps self._steps
]) ])
@ -133,7 +165,7 @@ class ViaPointReacher(gym.Env):
accumulated_theta = np.cumsum(theta, axis=0) accumulated_theta = np.cumsum(theta, axis=0)
endeffector = np.zeros(shape=(self.num_links, num_points_per_link, 2)) endeffector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
@ -141,33 +173,46 @@ class ViaPointReacher(gym.Env):
endeffector[0, :, 0] = x[0, :] endeffector[0, :, 0] = x[0, :]
endeffector[0, :, 1] = y[0, :] endeffector[0, :, 1] = y[0, :]
for i in range(1, self.num_links): for i in range(1, self.n_links):
endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0] endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0]
endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1] endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1]
return np.squeeze(endeffector + self._joints[0, :]) return np.squeeze(endeffector + self._joints[0, :])
def render(self, mode='human'): def render(self, mode='human'):
goal_pos = self._goal.T
via_pos = self._via_point.T
if self.fig is None: if self.fig is None:
# Create base figure once on the beginning. Afterwards only update
plt.ion()
self.fig = plt.figure() self.fig = plt.figure()
# plt.ion() ax = self.fig.add_subplot(1, 1, 1)
# plt.pause(0.01)
else: # limits
plt.figure(self.fig.number) lim = np.sum(self.link_lengths) + 0.5
ax.set_xlim([-lim, lim])
ax.set_ylim([-lim, lim])
self.line, = ax.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
self.goal_point_plot, = ax.plot(goal_pos[0], goal_pos[1], 'go')
self.via_point_plot, = ax.plot(via_pos[0], via_pos[1], 'gx')
self.fig.show()
self.fig.gca().set_title(f"Iteration: {self._steps}, distance: {self.end_effector - self._goal}")
if mode == "human": if mode == "human":
plt.cla() # goal
plt.title(f"Iteration: {self._steps}") if self._steps == 1:
self.goal_point_plot.set_data(goal_pos[0], goal_pos[1])
self.via_point_plot.set_data(via_pos[0], goal_pos[1])
# Arm # arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') self.line.set_data(self._joints[:, 0], self._joints[:, 1])
lim = np.sum(self.link_lengths) + 0.5 self.fig.canvas.draw()
plt.xlim([-lim, lim]) self.fig.canvas.flush_events()
plt.ylim([-lim, lim])
# plt.draw()
plt.pause(1e-4) # pushes window to foreground, which is annoying.
# self.fig.canvas.flush_events()
elif mode == "partial": elif mode == "partial":
if self._steps == 1: if self._steps == 1:
@ -196,12 +241,39 @@ class ViaPointReacher(gym.Env):
# Add the patch to the Axes # Add the patch to the Axes
[plt.gca().add_patch(rect) for rect in self.patches] [plt.gca().add_patch(rect) for rect in self.patches]
plt.xlim(-self.num_links, self.num_links), plt.ylim(-1, self.num_links) plt.xlim(-self.n_links, self.n_links), plt.ylim(-1, self.n_links)
# Arm # Arm
plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
plt.pause(0.01) plt.pause(0.01)
@property
def active_obs(self):
return np.hstack([
[self.random_start] * self.n_links, # cos
[self.random_start] * self.n_links, # sin
[self.random_start] * self.n_links, # velocity
[self._via_target is None] * 2, # x-y coordinates of via point distance
[True] * 2, # x-y coordinates of target distance
[False] # env steps
])
@property
def start_pos(self) -> Union[float, int, np.ndarray]:
return self._start_pos
@property
def goal_pos(self) -> Union[float, int, np.ndarray]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property
def end_effector(self):
return self._joints[self.n_links].T
def close(self): def close(self):
if self.fig is not None: if self.fig is not None:
plt.close(self.fig) plt.close(self.fig)

View File

@ -17,19 +17,8 @@ def make_contextual_env(rank, seed=0):
def _init(): def _init():
env = ALRBallInACupEnv(reward_type="contextual_goal") env = ALRBallInACupEnv(reward_type="contextual_goal")
env = DetPMPWrapper(env, env = DetPMPWrapper(env, num_dof=7, num_basis=5, width=0.005, duration=3.5, dt=env.dt, post_traj_time=4.5,
num_dof=7, policy_type="motor", weights_scale=0.5, zero_start=True, zero_goal=True)
num_basis=5,
width=0.005,
policy_type="motor",
start_pos=env.start_pos,
duration=3.5,
post_traj_time=4.5,
dt=env.dt,
weights_scale=0.5,
zero_start=True,
zero_goal=True
)
env.seed(seed + rank) env.seed(seed + rank)
return env return env
@ -51,19 +40,8 @@ def make_env(rank, seed=0):
def _init(): def _init():
env = ALRBallInACupEnv(reward_type="simple") env = ALRBallInACupEnv(reward_type="simple")
env = DetPMPWrapper(env, env = DetPMPWrapper(env, num_dof=7, num_basis=5, width=0.005, duration=3.5, dt=env.dt, post_traj_time=4.5,
num_dof=7, policy_type="motor", weights_scale=0.2, zero_start=True, zero_goal=True)
num_basis=5,
width=0.005,
policy_type="motor",
start_pos=env.start_pos,
duration=3.5,
post_traj_time=4.5,
dt=env.dt,
weights_scale=0.2,
zero_start=True,
zero_goal=True
)
env.seed(seed + rank) env.seed(seed + rank)
return env return env
@ -85,20 +63,8 @@ def make_simple_env(rank, seed=0):
def _init(): def _init():
env = ALRBallInACupEnv(reward_type="simple") env = ALRBallInACupEnv(reward_type="simple")
env = DetPMPWrapper(env, env = DetPMPWrapper(env, num_dof=3, num_basis=5, width=0.005, duration=3.5, dt=env.dt, post_traj_time=4.5,
num_dof=3, policy_type="motor", weights_scale=0.25, zero_start=True, zero_goal=True, off=-0.1)
num_basis=5,
width=0.005,
off=-0.1,
policy_type="motor",
start_pos=env.start_pos[1::2],
duration=3.5,
post_traj_time=4.5,
dt=env.dt,
weights_scale=0.25,
zero_start=True,
zero_goal=True
)
env.seed(seed + rank) env.seed(seed + rank)
return env return env

View File

@ -17,19 +17,8 @@ def make_contextual_env(rank, seed=0):
def _init(): def _init():
env = ALRBeerpongEnv() env = ALRBeerpongEnv()
env = DetPMPWrapper(env, env = DetPMPWrapper(env, num_dof=7, num_basis=5, width=0.005, duration=3.5, dt=env.dt, post_traj_time=4.5,
num_dof=7, policy_type="motor", weights_scale=0.5, zero_start=True, zero_goal=True)
num_basis=5,
width=0.005,
policy_type="motor",
start_pos=env.start_pos,
duration=3.5,
post_traj_time=4.5,
dt=env.dt,
weights_scale=0.5,
zero_start=True,
zero_goal=True
)
env.seed(seed + rank) env.seed(seed + rank)
return env return env
@ -51,19 +40,8 @@ def make_env(rank, seed=0):
def _init(): def _init():
env = ALRBeerpongEnvSimple() env = ALRBeerpongEnvSimple()
env = DetPMPWrapper(env, env = DetPMPWrapper(env, num_dof=7, num_basis=5, width=0.005, duration=3.5, dt=env.dt, post_traj_time=4.5,
num_dof=7, policy_type="motor", weights_scale=0.25, zero_start=True, zero_goal=True)
num_basis=5,
width=0.005,
policy_type="motor",
start_pos=env.start_pos,
duration=3.5,
post_traj_time=4.5,
dt=env.dt,
weights_scale=0.25,
zero_start=True,
zero_goal=True
)
env.seed(seed + rank) env.seed(seed + rank)
return env return env
@ -85,19 +63,8 @@ def make_simple_env(rank, seed=0):
def _init(): def _init():
env = ALRBeerpongEnvSimple() env = ALRBeerpongEnvSimple()
env = DetPMPWrapper(env, env = DetPMPWrapper(env, num_dof=3, num_basis=5, width=0.005, duration=3.5, dt=env.dt, post_traj_time=4.5,
num_dof=3, policy_type="motor", weights_scale=0.5, zero_start=True, zero_goal=True)
num_basis=5,
width=0.005,
policy_type="motor",
start_pos=env.start_pos[1::2],
duration=3.5,
post_traj_time=4.5,
dt=env.dt,
weights_scale=0.5,
zero_start=True,
zero_goal=True
)
env.seed(seed + rank) env.seed(seed + rank)
return env return env

View File

@ -49,7 +49,7 @@ def make_holereacher_env(rank, seed=0):
""" """
def _init(): def _init():
_env = hr.HoleReacher(n_links=5, _env = hr.HoleReacherEnv(n_links=5,
allow_self_collision=False, allow_self_collision=False,
allow_wall_collision=False, allow_wall_collision=False,
hole_width=0.25, hole_width=0.25,
@ -89,7 +89,7 @@ def make_holereacher_fix_goal_env(rank, seed=0):
""" """
def _init(): def _init():
_env = hr.HoleReacher(n_links=5, _env = hr.HoleReacherEnv(n_links=5,
allow_self_collision=False, allow_self_collision=False,
allow_wall_collision=False, allow_wall_collision=False,
hole_width=0.15, hole_width=0.15,
@ -129,7 +129,7 @@ def make_holereacher_env_pmp(rank, seed=0):
""" """
def _init(): def _init():
_env = hr.HoleReacher(n_links=5, _env = hr.HoleReacherEnv(n_links=5,
allow_self_collision=False, allow_self_collision=False,
allow_wall_collision=False, allow_wall_collision=False,
hole_width=0.15, hole_width=0.15,
@ -137,19 +137,8 @@ def make_holereacher_env_pmp(rank, seed=0):
hole_x=1, hole_x=1,
collision_penalty=1000) collision_penalty=1000)
_env = DetPMPWrapper(_env, _env = DetPMPWrapper(_env, num_dof=5, num_basis=5, width=0.02, duration=2, dt=_env.dt, post_traj_time=0,
num_dof=5, policy_type="velocity", weights_scale=0.2, zero_start=True, zero_goal=False)
num_basis=5,
width=0.02,
policy_type="velocity",
start_pos=_env._start_pos,
duration=2,
post_traj_time=0,
dt=_env.dt,
weights_scale=0.2,
zero_start=True,
zero_goal=False
)
_env.seed(seed + rank) _env.seed(seed + rank)
return _env return _env

View File

@ -7,22 +7,22 @@ from alr_envs.utils.mps.mp_wrapper import MPWrapper
class DetPMPWrapper(MPWrapper): class DetPMPWrapper(MPWrapper):
def __init__(self, env: MPEnv, num_dof: int, num_basis: int, width: int, start_pos=None, duration: int = 1, def __init__(self, env: MPEnv, num_dof: int, num_basis: int, width: int, duration: int = 1, dt: float = 0.01,
dt: float = 0.01, post_traj_time: float = 0., policy_type: str = None, weights_scale: float = 1., post_traj_time: float = 0., policy_type: str = None, weights_scale: float = 1.,
zero_start: bool = False, zero_goal: bool = False, **mp_kwargs): zero_start: bool = False, zero_goal: bool = False, **mp_kwargs):
# self.duration = duration # seconds self.duration = duration # seconds
super().__init__(env, num_dof, dt, duration, post_traj_time, policy_type, weights_scale, num_basis=num_basis, super().__init__(env, num_dof, dt, duration, post_traj_time, policy_type, weights_scale, num_basis=num_basis,
width=width, start_pos=start_pos, zero_start=zero_start, zero_goal=zero_goal, **mp_kwargs) width=width, zero_start=zero_start, zero_goal=zero_goal, **mp_kwargs)
self.dt = dt
action_bounds = np.inf * np.ones((self.mp.n_basis * self.mp.n_dof)) action_bounds = np.inf * np.ones((self.mp.n_basis * self.mp.n_dof))
self.action_space = gym.spaces.Box(low=-action_bounds, high=action_bounds, dtype=np.float32) self.action_space = gym.spaces.Box(low=-action_bounds, high=action_bounds, dtype=np.float32)
self.start_pos = start_pos
self.dt = dt
def initialize_mp(self, num_dof: int, duration: int, dt: float, num_basis: int = 5, width: float = None, def initialize_mp(self, num_dof: int, duration: int, dt: float, num_basis: int = 5, width: float = None,
start_pos: np.ndarray = None, zero_start: bool = False, zero_goal: bool = False): zero_start: bool = False, zero_goal: bool = False):
pmp = det_promp.DeterministicProMP(n_basis=num_basis, n_dof=num_dof, width=width, off=0.01, pmp = det_promp.DeterministicProMP(n_basis=num_basis, n_dof=num_dof, width=width, off=0.01,
zero_start=zero_start, zero_goal=zero_goal) zero_start=zero_start, zero_goal=zero_goal)

View File

@ -63,7 +63,7 @@ class DmpWrapper(MPWrapper):
goal_pos = params[0, -self.mp.num_dimensions:] # [num_dof] goal_pos = params[0, -self.mp.num_dimensions:] # [num_dof]
params = params[:, :-self.mp.num_dimensions] # [1,num_dof] params = params[:, :-self.mp.num_dimensions] # [1,num_dof]
else: else:
goal_pos = self.env.goal_pos # self.mp.dmp_goal_pos.flatten() goal_pos = self.env.goal_pos
assert goal_pos is not None assert goal_pos is not None
weight_matrix = np.reshape(params, self.mp.dmp_weights.shape) # [num_basis, num_dof] weight_matrix = np.reshape(params, self.mp.dmp_weights.shape) # [num_basis, num_dof]

View File

@ -9,7 +9,7 @@ class MPEnv(gym.Env):
@property @property
@abstractmethod @abstractmethod
def corrected_obs_index(self): def active_obs(self):
"""Returns boolean value for each observation entry """Returns boolean value for each observation entry
whether the observation is returned by the DMP for the contextual case or not. whether the observation is returned by the DMP for the contextual case or not.
This effectively allows to filter unwanted or unnecessary observations from the full step-based case. This effectively allows to filter unwanted or unnecessary observations from the full step-based case.

View File

@ -13,6 +13,12 @@ class MPWrapper(gym.Wrapper, ABC):
policy_type: str = None, weights_scale: float = 1., render_mode: str = None, **mp_kwargs): policy_type: str = None, weights_scale: float = 1., render_mode: str = None, **mp_kwargs):
super().__init__(env) super().__init__(env)
# adjust observation space to reduce version
obs_sp = self.env.observation_space
self.observation_space = gym.spaces.Box(low=obs_sp.low[self.env.active_obs],
high=obs_sp.high[self.env.active_obs],
dtype=obs_sp.dtype)
assert dt is not None # this should never happen as MPWrapper is a base class assert dt is not None # this should never happen as MPWrapper is a base class
self.post_traj_steps = int(post_traj_time / dt) self.post_traj_steps = int(post_traj_time / dt)
@ -51,8 +57,7 @@ class MPWrapper(gym.Wrapper, ABC):
self.env.configure(context) self.env.configure(context)
def reset(self): def reset(self):
obs = self.env.reset() return self.env.reset()[self.env.active_obs]
return obs[self.env]
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" This function generates a trajectory based on a DMP and then does the usual loop over reset and step""" """ This function generates a trajectory based on a DMP and then does the usual loop over reset and step"""
@ -82,7 +87,7 @@ class MPWrapper(gym.Wrapper, ABC):
break break
done = True done = True
return obs, rewards, done, info return obs[self.env.active_obs], rewards, done, info
def render(self, mode='human', **kwargs): def render(self, mode='human', **kwargs):
"""Only set render options here, such that they can be used during the rollout. """Only set render options here, such that they can be used during the rollout.

View File

@ -46,7 +46,7 @@ def example_dmp():
obs = env.reset() obs = env.reset()
def example_async(n_cpu=4, seed=int('533D', 16)): def example_async(env_id="alr_envs:HoleReacherDMP-v0", n_cpu=4, seed=int('533D', 16)):
def make_env(env_id, seed, rank): def make_env(env_id, seed, rank):
env = gym.make(env_id) env = gym.make(env_id)
env.seed(seed + rank) env.seed(seed + rank)
@ -73,7 +73,7 @@ def example_async(n_cpu=4, seed=int('533D', 16)):
# do not return values above threshold # do not return values above threshold
return (*map(lambda v: np.stack(v)[:n_samples], vals.values()),) return (*map(lambda v: np.stack(v)[:n_samples], vals.values()),)
envs = gym.vector.AsyncVectorEnv([make_env("alr_envs:HoleReacherDMP-v0", seed, i) for i in range(n_cpu)]) envs = gym.vector.AsyncVectorEnv([make_env(env_id, seed, i) for i in range(n_cpu)])
obs = envs.reset() obs = envs.reset()
print(sample(envs, 16)) print(sample(envs, 16))
@ -82,7 +82,6 @@ def example_async(n_cpu=4, seed=int('533D', 16)):
if __name__ == '__main__': if __name__ == '__main__':
# example_mujoco() # example_mujoco()
# example_dmp() # example_dmp()
# example_async() example_async("alr_envs:LongSimpleReacherDMP-v0", 4)
# env = gym.make("alr_envs:HoleReacherDMP-v0", context=0.1) # env = gym.make("alr_envs:HoleReacherDMP-v0", context=0.1)
env = gym.make("alr_envs:SimpleReacherDMP-v1") # env = gym.make("alr_envs:HoleReacherDMP-v1")
print()