Merge pull request #17 from ALRhub/reacher_env_cleanup

Reacher env cleanup
This commit is contained in:
ottofabian 2021-12-01 16:17:28 +01:00 committed by GitHub
commit 22ac9a0317
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 566 additions and 365 deletions

View File

@ -1,5 +1,5 @@
from alr_envs import dmc, meta, open_ai from alr_envs import dmc, meta, open_ai
from alr_envs.utils.make_env_helpers import make, make_detpmp_env, make_dmp_env, make_rank from alr_envs.utils.make_env_helpers import make, make_detpmp_env, make_dmp_env, make_promp_env, make_rank
from alr_envs.utils import make_dmc from alr_envs.utils import make_dmc
# Convenience function for all MP environments # Convenience function for all MP environments

View File

@ -10,7 +10,7 @@ from .mujoco.ball_in_a_cup.biac_pd import ALRBallInACupPDEnv
from .mujoco.reacher.alr_reacher import ALRReacherEnv from .mujoco.reacher.alr_reacher import ALRReacherEnv
from .mujoco.reacher.balancing import BalancingEnv from .mujoco.reacher.balancing import BalancingEnv
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []}
# Classic Control # Classic Control
## Simple Reacher ## Simple Reacher
@ -110,7 +110,7 @@ register(
"hole_width": 0.25, "hole_width": 0.25,
"hole_depth": 1, "hole_depth": 1,
"hole_x": 2, "hole_x": 2,
"collision_penalty": 100, "collision_penalty": 1,
} }
) )
@ -237,7 +237,7 @@ for _v in _versions:
"mp_kwargs": { "mp_kwargs": {
"num_dof": 2 if "long" not in _v.lower() else 5, "num_dof": 2 if "long" not in _v.lower() else 5,
"num_basis": 5, "num_basis": 5,
"duration": 20, "duration": 2,
"alpha_phase": 2, "alpha_phase": 2,
"learn_goal": True, "learn_goal": True,
"policy_type": "velocity", "policy_type": "velocity",
@ -247,6 +247,25 @@ for _v in _versions:
) )
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id)
_env_id = f'{_name[0]}ProMP-{_name[1]}'
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:{_v}",
"wrappers": [classic_control.simple_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 2 if "long" not in _v.lower() else 5,
"num_basis": 5,
"duration": 2,
"policy_type": "motor",
"weights_scale": 1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
_env_id = f'{_name[0]}DetPMP-{_name[1]}' _env_id = f'{_name[0]}DetPMP-{_name[1]}'
register( register(
id=_env_id, id=_env_id,
@ -258,7 +277,7 @@ for _v in _versions:
"mp_kwargs": { "mp_kwargs": {
"num_dof": 2 if "long" not in _v.lower() else 5, "num_dof": 2 if "long" not in _v.lower() else 5,
"num_basis": 5, "num_basis": 5,
"duration": 20, "duration": 2,
"width": 0.025, "width": 0.025,
"policy_type": "velocity", "policy_type": "velocity",
"weights_scale": 0.2, "weights_scale": 0.2,
@ -289,6 +308,24 @@ register(
) )
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0") ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append("ViaPointReacherDMP-v0")
register(
id="ViaPointReacherProMP-v0",
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:ViaPointReacher-v0",
"wrappers": [classic_control.viapoint_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 5,
"num_basis": 5,
"duration": 2,
"policy_type": "motor",
"weights_scale": 1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ViaPointReacherProMP-v0")
register( register(
id='ViaPointReacherDetPMP-v0', id='ViaPointReacherDetPMP-v0',
entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper', entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env_helper',
@ -325,7 +362,7 @@ for _v in _versions:
"num_basis": 5, "num_basis": 5,
"duration": 2, "duration": 2,
"learn_goal": True, "learn_goal": True,
"alpha_phase": 2, "alpha_phase": 2.5,
"bandwidth_factor": 2, "bandwidth_factor": 2,
"policy_type": "velocity", "policy_type": "velocity",
"weights_scale": 50, "weights_scale": 50,
@ -335,6 +372,25 @@ for _v in _versions:
) )
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id) ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["DMP"].append(_env_id)
_env_id = f'HoleReacherProMP-{_v}'
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:HoleReacher-{_v}",
"wrappers": [classic_control.hole_reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 5,
"num_basis": 5,
"duration": 2,
"policy_type": "velocity",
"weights_scale": 0.1,
"zero_start": True
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
_env_id = f'HoleReacherDetPMP-{_v}' _env_id = f'HoleReacherDetPMP-{_v}'
register( register(
id=_env_id, id=_env_id,

View File

@ -0,0 +1,146 @@
from typing import Iterable, Union
from abc import ABC, abstractmethod
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.utils import intersect
class BaseReacherEnv(gym.Env, ABC):
"""
Base class for all reaching environments.
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
self._dt = 0.01
self.random_start = random_start
self.allow_self_collision = allow_self_collision
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._acc = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
# joint limits
self.j_min = -np.pi * np.ones(n_links)
self.j_max = np.pi * np.ones(n_links)
self.steps_before_reward = 199
state_bound = np.hstack([
[np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin
[np.inf] * self.n_links, # velocity
[np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe
])
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
self.reward_function = None # Needs to be set in sub class
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
@abstractmethod
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
raise NotImplementedError
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _check_self_collision(self):
"""Checks whether line segments intersect"""
if self.allow_self_collision:
return False
if np.any(self._joint_angles > self.j_max) or np.any(self._joint_angles < self.j_min):
return True
link_lines = np.stack((self._joints[:-1, :], self._joints[1:, :]), axis=1)
for i, line1 in enumerate(link_lines):
for line2 in link_lines[i + 2:, :]:
if intersect(line1[0], line1[-1], line2[0], line2[-1]):
return True
return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
@abstractmethod
def _check_collisions(self) -> bool:
pass
@abstractmethod
def _terminate(self, info) -> bool:
return False
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
del self.fig
@property
def end_effector(self):
return self._joints[self.n_links].T

View File

@ -0,0 +1,37 @@
from abc import ABC
from gym import spaces
import numpy as np
from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherDirectEnv(BaseReacherEnv, ABC):
"""
Base class for directly controlled reaching environments
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
super().__init__(n_links, random_start, allow_self_collision)
self.max_vel = 2 * np.pi
action_bound = np.ones((self.n_links,)) * self.max_vel
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
self._acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
self._is_collided = self._check_collisions()
reward, info = self._get_reward(action)
self._steps += 1
done = self._terminate(info)
return self._get_obs().copy(), reward, done, info

View File

@ -0,0 +1,36 @@
from abc import ABC
from gym import spaces
import numpy as np
from alr_envs.alr.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherTorqueEnv(BaseReacherEnv, ABC):
"""
Base class for torque controlled reaching environments
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
super().__init__(n_links, random_start, allow_self_collision)
self.max_torque = 1000
action_bound = np.ones((self.n_links,)) * self.max_torque
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
def step(self, action: np.ndarray):
"""
A single step with action in torque space
"""
self._angle_velocity = self._angle_velocity + self.dt * action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
self._is_collided = self._check_collisions()
reward, info = self._get_reward(action)
self._steps += 1
done = False
return self._get_obs().copy(), reward, done, info

View File

@ -3,22 +3,17 @@ from typing import Union
import gym import gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym.utils import seeding
from matplotlib import patches from matplotlib import patches
from alr_envs.alr.classic_control.utils import check_self_collision from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
class HoleReacherEnv(gym.Env): class HoleReacherEnv(BaseReacherDirectEnv):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
allow_wall_collision: bool = False, collision_penalty: float = 1000): allow_wall_collision: bool = False, collision_penalty: float = 1000, rew_fct: str = "simple"):
self.n_links = n_links super().__init__(n_links, random_start, allow_self_collision)
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters # provided initial parameters
self.initial_x = hole_x # x-position of center of hole self.initial_x = hole_x # x-position of center of hole
@ -31,21 +26,7 @@ class HoleReacherEnv(gym.Env):
self._tmp_depth = None self._tmp_depth = None
self._goal = None # x-y coordinates for reaching the center at the bottom of the hole self._goal = None # x-y coordinates for reaching the center at the bottom of the hole
# collision # action_bound = np.pi * np.ones((self.n_links,))
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -55,71 +36,30 @@ class HoleReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe [np.inf] # env steps, because reward start after n steps TODO: Maybe
]) ])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) # self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting if rew_fct == "simple":
self.metadata = {'render.modes': ["human", "partial"]} from alr_envs.alr.classic_control.hole_reacher.hr_simple_reward import HolereacherReward
self.fig = None self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty)
elif rew_fct == "vel_acc":
self._steps = 0 from alr_envs.alr.classic_control.hole_reacher.hr_dist_vel_acc_reward import HolereacherReward
self.seed() self.reward_function = HolereacherReward(allow_self_collision, allow_wall_collision, collision_penalty)
else:
@property raise ValueError("Unknown reward function {}".format(rew_fct))
def dt(self) -> Union[float, int]:
return self._dt
# @property
# def start_pos(self):
# return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
A single step with an action in joint velocity space
"""
acc = (action - self._angle_velocity) / self.dt
self._angle_velocity = action
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity # + 0.001 * np.random.randn(5)
self._update_joints()
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self.end_effector_traj.append(np.copy(self.end_effector))
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self): def reset(self):
if self.random_start:
# Maybe change more than first seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_hole() self._generate_hole()
self._set_patches() self._set_patches()
self.reward_function.reset()
self._angle_velocity = self._start_vel return super().reset()
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
self.end_effector_traj = []
return self._get_obs().copy() def _get_reward(self, action: np.ndarray) -> (float, dict):
return self.reward_function.get_reward(self)
def _terminate(self, info):
return info["is_collided"]
def _generate_hole(self): def _generate_hole(self):
if self.initial_width is None: if self.initial_width is None:
@ -144,45 +84,17 @@ class HoleReacherEnv(gym.Env):
self._tmp_depth = depth self._tmp_depth = depth
self._goal = np.hstack([self._tmp_x, -self._tmp_depth]) self._goal = np.hstack([self._tmp_x, -self._tmp_depth])
def _update_joints(self): self._line_ground_left = np.array([-self.n_links, 0, x - width / 2, 0])
""" self._line_ground_right = np.array([x + width / 2, 0, self.n_links, 0])
update _joints to get new end effector position. The other links are only required for rendering. self._line_ground_hole = np.array([x - width / 2, -depth, x + width / 2, -depth])
Returns: self._line_hole_left = np.array([x - width / 2, -depth, x - width / 2, 0])
self._line_hole_right = np.array([x + width / 2, -depth, x + width / 2, 0])
""" self.ground_lines = np.stack((self._line_ground_left,
line_points_in_taskspace = self._get_forward_kinematics(num_points_per_link=20) self._line_ground_right,
self._line_ground_hole,
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] self._line_hole_left,
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] self._line_hole_right))
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision:
self_collision = True
if not self.allow_wall_collision:
wall_collision = self._check_wall_collision(line_points_in_taskspace)
self._is_collided = self_collision or wall_collision
def _get_reward(self, acc: np.ndarray):
reward = 0
# success = False
if self._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(self.end_effector - self._goal)
# success = dist < 0.005 and not self._is_collided
reward = - dist ** 2 - self.collision_penalty * self._is_collided
reward -= 5e-8 * np.sum(acc ** 2)
# info = {"is_success": success}
return reward, {} # info
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
@ -194,17 +106,17 @@ class HoleReacherEnv(gym.Env):
# self._tmp_hole_depth, # self._tmp_hole_depth,
self.end_effector - self._goal, self.end_effector - self._goal,
self._steps self._steps
]) ]).astype(np.float32)
def _get_forward_kinematics(self, num_points_per_link=1): def _get_line_points(self, num_points_per_link=1):
theta = self._joint_angles[:, None] theta = self._joint_angles[:, None]
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1 intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0) accumulated_theta = np.cumsum(theta, axis=0)
end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2)) end_effector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points x = np.cos(accumulated_theta) * self.link_lengths[:, None] * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points y = np.sin(accumulated_theta) * self.link_lengths[:, None] * intermediate_points
end_effector[0, :, 0] = x[0, :] end_effector[0, :, 0] = x[0, :]
end_effector[0, :, 1] = y[0, :] end_effector[0, :, 1] = y[0, :]
@ -215,7 +127,12 @@ class HoleReacherEnv(gym.Env):
return np.squeeze(end_effector + self._joints[0, :]) return np.squeeze(end_effector + self._joints[0, :])
def _check_wall_collision(self, line_points): def _check_collisions(self) -> bool:
return self._check_self_collision() or self.check_wall_collision()
def check_wall_collision(self):
line_points = self._get_line_points(num_points_per_link=100)
# all points that are before the hole in x # all points that are before the hole in x
r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2)) r, c = np.where(line_points[:, :, 0] < (self._tmp_x - self._tmp_width / 2))
@ -281,7 +198,7 @@ class HoleReacherEnv(gym.Env):
def _set_patches(self): def _set_patches(self):
if self.fig is not None: if self.fig is not None:
self.fig.gca().patches = [] # self.fig.gca().patches = []
left_block = patches.Rectangle((-self.n_links, -self._tmp_depth), left_block = patches.Rectangle((-self.n_links, -self._tmp_depth),
self.n_links + self._tmp_x - self._tmp_width / 2, self.n_links + self._tmp_x - self._tmp_width / 2,
self._tmp_depth, self._tmp_depth,
@ -300,15 +217,15 @@ class HoleReacherEnv(gym.Env):
self.fig.gca().add_patch(right_block) self.fig.gca().add_patch(right_block)
self.fig.gca().add_patch(hole_floor) self.fig.gca().add_patch(hole_floor)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
@property if __name__ == "__main__":
def end_effector(self): import time
return self._joints[self.n_links].T env = HoleReacherEnv(5)
env.reset()
def close(self): for i in range(10000):
super().close() ac = env.action_space.sample()
if self.fig is not None: obs, rew, done, info = env.step(ac)
plt.close(self.fig) env.render()
if done:
env.reset()

View File

@ -0,0 +1,60 @@
import numpy as np
class HolereacherReward:
def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty):
self.collision_penalty = collision_penalty
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
self._is_collided = False
self.reward_factors = np.array((-1, -1e-4, -1e-6, -collision_penalty, 0))
def reset(self):
self._is_collided = False
self.collision_dist = 0
def get_reward(self, env):
dist_cost = 0
collision_cost = 0
time_cost = 0
success = False
self_collision = False
wall_collision = False
if not self._is_collided:
if not self.allow_self_collision:
self_collision = env._check_self_collision()
if not self.allow_wall_collision:
wall_collision = env.check_wall_collision()
self._is_collided = self_collision or wall_collision
self.collision_dist = np.linalg.norm(env.end_effector - env._goal)
if env._steps == 199: # or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(env.end_effector - env._goal)
success = dist < 0.005 and not self._is_collided
dist_cost = dist ** 2
collision_cost = self._is_collided * self.collision_dist ** 2
time_cost = 199 - env._steps
info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
vel_cost = np.sum(env._angle_velocity ** 2)
acc_cost = np.sum(env._acc ** 2)
reward_features = np.array((dist_cost, vel_cost, acc_cost, collision_cost, time_cost))
reward = np.dot(reward_features, self.reward_factors)
return reward, info

View File

@ -0,0 +1,53 @@
import numpy as np
class HolereacherReward:
def __init__(self, allow_self_collision, allow_wall_collision, collision_penalty):
self.collision_penalty = collision_penalty
# collision
self.allow_self_collision = allow_self_collision
self.allow_wall_collision = allow_wall_collision
self.collision_penalty = collision_penalty
self._is_collided = False
self.reward_factors = np.array((-1, -5e-8, -collision_penalty))
def reset(self):
self._is_collided = False
def get_reward(self, env):
dist_cost = 0
collision_cost = 0
success = False
self_collision = False
wall_collision = False
if not self.allow_self_collision:
self_collision = env._check_self_collision()
if not self.allow_wall_collision:
wall_collision = env.check_wall_collision()
self._is_collided = self_collision or wall_collision
if env._steps == 199 or self._is_collided:
# return reward only in last time step
# Episode also terminates when colliding, hence return reward
dist = np.linalg.norm(env.end_effector - env._goal)
dist_cost = dist ** 2
collision_cost = int(self._is_collided)
success = dist < 0.005 and not self._is_collided
info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(env.end_effector)}
acc_cost = np.sum(env._acc ** 2)
reward_features = np.array((dist_cost, acc_cost, collision_cost))
reward = np.dot(reward_features, self.reward_factors)
return reward, info

View File

@ -18,6 +18,14 @@ class MPWrapper(MPEnvWrapper):
[False] # env steps [False] # env steps
]) ])
# @property
# def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
# return self._joint_angles.copy()
#
# @property
# def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
# return self._angle_velocity.copy()
@property @property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.current_pos return self.env.current_pos

View File

@ -1,26 +1,22 @@
from typing import Iterable, Union from typing import Iterable, Union
import gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym import spaces from gym import spaces
from gym.utils import seeding
from alr_envs.alr.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv
class SimpleReacherEnv(gym.Env): class SimpleReacherEnv(BaseReacherTorqueEnv):
""" """
Simple Reaching Task without any physics simulation. Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions
towards the end of the trajectory. towards the end of the trajectory.
""" """
def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True): def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True,
super().__init__() allow_self_collision: bool = False,):
self.link_lengths = np.ones(n_links) super().__init__(n_links, random_start, allow_self_collision)
self.n_links = n_links
self._dt = 0.1
self.random_start = random_start
# provided initial parameters # provided initial parameters
self.inital_target = target self.inital_target = target
@ -28,16 +24,10 @@ class SimpleReacherEnv(gym.Env):
# temp container for current env state # temp container for current env state
self._goal = None self._goal = None
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.zeros(self.n_links) self._start_pos = np.zeros(self.n_links)
self._start_vel = np.zeros(self.n_links)
self.max_torque = 1
self.steps_before_reward = 199 self.steps_before_reward = 199
action_bound = np.ones((self.n_links,)) * self.max_torque
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -45,84 +35,24 @@ class SimpleReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps TODO: Maybe [np.inf] # env steps, because reward start after n steps TODO: Maybe
]) ])
self.action_space = spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
# @property # @property
# def start_pos(self): # def start_pos(self):
# return self._start_pos # return self._start_pos
@property
def current_pos(self):
return self._joint_angles
@property
def current_vel(self):
return self._angle_velocity
def step(self, action: np.ndarray):
"""
A single step with action in torque space
"""
# action = self._add_action_noise(action)
ac = np.clip(action, -self.max_torque, self.max_torque)
self._angle_velocity = self._angle_velocity + self.dt * ac
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
reward, info = self._get_reward(action)
self._steps += 1
done = False
return self._get_obs().copy(), reward, done, info
def reset(self): def reset(self):
# TODO: maybe do initialisation more random?
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
self._joint_angles = np.hstack([[self.np_random.uniform(-np.pi, np.pi)], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal() self._generate_goal()
self._angle_velocity = self._start_vel return super().reset()
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
Returns:
"""
angles = np.cumsum(self._joint_angles)
x = self.link_lengths * np.vstack([np.cos(angles), np.sin(angles)])
self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
def _get_reward(self, action: np.ndarray): def _get_reward(self, action: np.ndarray):
diff = self.end_effector - self._goal diff = self.end_effector - self._goal
reward_dist = 0 reward_dist = 0
if not self.allow_self_collision:
self._is_collided = self._check_self_collision()
if self._steps >= self.steps_before_reward: if self._steps >= self.steps_before_reward:
reward_dist -= np.linalg.norm(diff) reward_dist -= np.linalg.norm(diff)
# reward_dist = np.exp(-0.1 * diff ** 2).mean() # reward_dist = np.exp(-0.1 * diff ** 2).mean()
@ -132,6 +62,9 @@ class SimpleReacherEnv(gym.Env):
reward = reward_dist - reward_ctrl reward = reward_dist - reward_ctrl
return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl) return reward, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl)
def _terminate(self, info):
return False
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
return np.hstack([ return np.hstack([
@ -140,7 +73,7 @@ class SimpleReacherEnv(gym.Env):
self._angle_velocity, self._angle_velocity,
self.end_effector - self._goal, self.end_effector - self._goal,
self._steps self._steps
]) ]).astype(np.float32)
def _generate_goal(self): def _generate_goal(self):
@ -155,6 +88,9 @@ class SimpleReacherEnv(gym.Env):
self._goal = goal self._goal = goal
def _check_collisions(self) -> bool:
return self._check_self_collision()
def render(self, mode='human'): # pragma: no cover def render(self, mode='human'): # pragma: no cover
if self.fig is None: if self.fig is None:
# Create base figure once on the beginning. Afterwards only update # Create base figure once on the beginning. Afterwards only update
@ -190,13 +126,14 @@ class SimpleReacherEnv(gym.Env):
self.fig.canvas.draw() self.fig.canvas.draw()
self.fig.canvas.flush_events() self.fig.canvas.flush_events()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self): if __name__ == "__main__":
del self.fig env = SimpleReacherEnv(5)
env.reset()
for i in range(200):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
@property env.render()
def end_effector(self): if done:
return self._joints[self.n_links].T break

View File

@ -1,3 +1,6 @@
import numpy as np
def ccw(A, B, C): def ccw(A, B, C):
return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0]) > 1e-12 return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0]) > 1e-12
@ -10,7 +13,7 @@ def intersect(A, B, C, D):
def check_self_collision(line_points): def check_self_collision(line_points):
"""Checks whether line segments and intersect""" """Checks whether line segments intersect"""
for i, line1 in enumerate(line_points): for i, line1 in enumerate(line_points):
for line2 in line_points[i + 2:, :, :]: for line2 in line_points[i + 2:, :, :]:
if intersect(line1[0], line1[-1], line2[0], line2[-1]): if intersect(line1[0], line1[-1], line2[0], line2[-1]):

View File

@ -6,17 +6,15 @@ import numpy as np
from gym.utils import seeding from gym.utils import seeding
from alr_envs.alr.classic_control.utils import check_self_collision from alr_envs.alr.classic_control.utils import check_self_collision
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
class ViaPointReacherEnv(gym.Env): class ViaPointReacherEnv(BaseReacherDirectEnv):
def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None, def __init__(self, n_links, random_start: bool = False, via_target: Union[None, Iterable] = None,
target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000): target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000):
self.n_links = n_links super().__init__(n_links, random_start, allow_self_collision)
self.link_lengths = np.ones((n_links, 1))
self.random_start = random_start
# provided initial parameters # provided initial parameters
self.intitial_target = target # provided target value self.intitial_target = target # provided target value
@ -27,20 +25,8 @@ class ViaPointReacherEnv(gym.Env):
self._goal = np.array((n_links, 0)) self._goal = np.array((n_links, 0))
# collision # collision
self.allow_self_collision = allow_self_collision
self.collision_penalty = collision_penalty self.collision_penalty = collision_penalty
# state
self._joints = None
self._joint_angles = None
self._angle_velocity = None
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links)
self.weight_matrix_scale = 1
self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
[np.pi] * self.n_links, # cos [np.pi] * self.n_links, # cos
[np.pi] * self.n_links, # sin [np.pi] * self.n_links, # sin
@ -49,69 +35,15 @@ class ViaPointReacherEnv(gym.Env):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps [np.inf] # env steps, because reward start after n steps
]) ])
self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# containers for plotting
self.metadata = {'render.modes': ["human", "partial"]}
self.fig = None
self._steps = 0
self.seed()
@property
def dt(self):
return self._dt
# @property # @property
# def start_pos(self): # def start_pos(self):
# return self._start_pos # return self._start_pos
@property
def current_pos(self):
return self._joint_angles.copy()
@property
def current_vel(self):
return self._angle_velocity.copy()
def step(self, action: np.ndarray):
"""
a single step with an action in joint velocity space
"""
vel = action
self._angle_velocity = vel
self._joint_angles = self._joint_angles + self.dt * self._angle_velocity
self._update_joints()
acc = (vel - self._angle_velocity) / self.dt
reward, info = self._get_reward(acc)
info.update({"is_collided": self._is_collided})
self._steps += 1
done = self._is_collided
return self._get_obs().copy(), reward, done, info
def reset(self): def reset(self):
if self.random_start:
# Maybe change more than dirst seed
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy()
else:
self._joint_angles = self._start_pos
self._generate_goal() self._generate_goal()
return super().reset()
self._angle_velocity = self._start_vel
self._joints = np.zeros((self.n_links + 1, 2))
self._update_joints()
self._steps = 0
return self._get_obs().copy()
def _generate_goal(self): def _generate_goal(self):
# TODO: Maybe improve this later, this can yield quite a lot of invalid settings # TODO: Maybe improve this later, this can yield quite a lot of invalid settings
@ -137,29 +69,13 @@ class ViaPointReacherEnv(gym.Env):
self._via_point = via_target self._via_point = via_target
self._goal = goal self._goal = goal
def _update_joints(self):
"""
update _joints to get new end effector position. The other links are only required for rendering.
Returns:
"""
line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20)
self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0]
self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1]
self_collision = False
if not self.allow_self_collision:
self_collision = check_self_collision(line_points_in_taskspace)
if np.any(np.abs(self._joint_angles) > np.pi):
self_collision = True
self._is_collided = self_collision
def _get_reward(self, acc): def _get_reward(self, acc):
success = False success = False
reward = -np.inf reward = -np.inf
if not self.allow_self_collision:
self._is_collided = self._check_self_collision()
if not self._is_collided: if not self._is_collided:
dist = np.inf dist = np.inf
# return intermediate reward for via point # return intermediate reward for via point
@ -177,10 +93,15 @@ class ViaPointReacherEnv(gym.Env):
reward -= dist ** 2 reward -= dist ** 2
reward -= 5e-8 * np.sum(acc ** 2) reward -= 5e-8 * np.sum(acc ** 2)
info = {"is_success": success} info = {"is_success": success,
"is_collided": self._is_collided,
"end_effector": np.copy(self.end_effector)}
return reward, info return reward, info
def _terminate(self, info):
return info["is_collided"]
def _get_obs(self): def _get_obs(self):
theta = self._joint_angles theta = self._joint_angles
return np.hstack([ return np.hstack([
@ -190,28 +111,10 @@ class ViaPointReacherEnv(gym.Env):
self.end_effector - self._via_point, self.end_effector - self._via_point,
self.end_effector - self._goal, self.end_effector - self._goal,
self._steps self._steps
]) ]).astype(np.float32)
def get_forward_kinematics(self, num_points_per_link=1): def _check_collisions(self) -> bool:
theta = self._joint_angles[:, None] return self._check_self_collision()
intermediate_points = np.linspace(0, 1, num_points_per_link) if num_points_per_link > 1 else 1
accumulated_theta = np.cumsum(theta, axis=0)
endeffector = np.zeros(shape=(self.n_links, num_points_per_link, 2))
x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points
y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points
endeffector[0, :, 0] = x[0, :]
endeffector[0, :, 1] = y[0, :]
for i in range(1, self.n_links):
endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0]
endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1]
return np.squeeze(endeffector + self._joints[0, :])
def render(self, mode='human'): def render(self, mode='human'):
goal_pos = self._goal.T goal_pos = self._goal.T
@ -281,14 +184,14 @@ class ViaPointReacherEnv(gym.Env):
plt.pause(0.01) plt.pause(0.01)
def seed(self, seed=None): if __name__ == "__main__":
self.np_random, seed = seeding.np_random(seed) import time
return [seed] env = ViaPointReacherEnv(5)
env.reset()
@property for i in range(10000):
def end_effector(self): ac = env.action_space.sample()
return self._joints[self.n_links].T obs, rew, done, info = env.step(ac)
env.render()
def close(self): if done:
if self.fig is not None: env.reset()
plt.close(self.fig)

View File

@ -87,6 +87,7 @@ class ALRReacherEnv(MujocoEnv, utils.EzPickle):
[self._steps], [self._steps],
]) ])
if __name__ == '__main__': if __name__ == '__main__':
nl = 5 nl = 5
render_mode = "human" # "human" or "partial" or "final" render_mode = "human" # "human" or "partial" or "final"

View File

@ -1,6 +1,6 @@
from . import manipulation, suite from . import manipulation, suite
ALL_DEEPMIND_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} ALL_DEEPMIND_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []}
from gym.envs.registration import register from gym.envs.registration import register

View File

@ -15,10 +15,10 @@ def _spec_to_box(spec):
assert s.dtype == np.float64 or s.dtype == np.float32, f"Only float64 and float32 types are allowed, instead {s.dtype} was found" assert s.dtype == np.float64 or s.dtype == np.float32, f"Only float64 and float32 types are allowed, instead {s.dtype} was found"
dim = int(np.prod(s.shape)) dim = int(np.prod(s.shape))
if type(s) == specs.Array: if type(s) == specs.Array:
bound = np.inf * np.ones(dim, dtype=np.float32) bound = np.inf * np.ones(dim, dtype=s.dtype)
return -bound, bound return -bound, bound
elif type(s) == specs.BoundedArray: elif type(s) == specs.BoundedArray:
zeros = np.zeros(dim, dtype=np.float32) zeros = np.zeros(dim, dtype=s.dtype)
return s.minimum + zeros, s.maximum + zeros return s.minimum + zeros, s.maximum + zeros
mins, maxs = [], [] mins, maxs = [], []
@ -29,7 +29,7 @@ def _spec_to_box(spec):
low = np.concatenate(mins, axis=0) low = np.concatenate(mins, axis=0)
high = np.concatenate(maxs, axis=0) high = np.concatenate(maxs, axis=0)
assert low.shape == high.shape assert low.shape == high.shape
return spaces.Box(low, high, dtype=np.float32) return spaces.Box(low, high, dtype=s.dtype)
def _flatten_obs(obs: collections.MutableMapping): def _flatten_obs(obs: collections.MutableMapping):
@ -113,7 +113,7 @@ class DMCWrapper(core.Env):
if self._channels_first: if self._channels_first:
obs = obs.transpose(2, 0, 1).copy() obs = obs.transpose(2, 0, 1).copy()
else: else:
obs = _flatten_obs(time_step.observation) obs = _flatten_obs(time_step.observation).astype(self.observation_space.dtype)
return obs return obs
@property @property

View File

@ -112,7 +112,7 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
# Replace this wrapper with the custom wrapper for your environment by inheriting from the MPEnvWrapper. # Replace this wrapper with the custom wrapper for your environment by inheriting from the MPEnvWrapper.
# You can also add other gym.Wrappers in case they are needed. # You can also add other gym.Wrappers in case they are needed.
wrappers = [alr_envs.classic_control.hole_reacher.MPWrapper] wrappers = [alr_envs.alr.classic_control.hole_reacher.MPWrapper]
mp_kwargs = { mp_kwargs = {
"num_dof": 5, "num_dof": 5,
"num_basis": 5, "num_basis": 5,
@ -147,10 +147,13 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
if __name__ == '__main__': if __name__ == '__main__':
render = False render = True
# DMP # DMP
example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render) example_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render)
# ProMP
example_mp("alr_envs:HoleReacherProMP-v1", seed=10, iterations=1, render=render)
# DetProMP # DetProMP
example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=1, render=render) example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=1, render=render)

View File

@ -3,7 +3,7 @@ from gym import register
from . import goal_object_change_mp_wrapper, goal_change_mp_wrapper, goal_endeffector_change_mp_wrapper, \ from . import goal_object_change_mp_wrapper, goal_change_mp_wrapper, goal_endeffector_change_mp_wrapper, \
object_change_mp_wrapper object_change_mp_wrapper
ALL_METAWORLD_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} ALL_METAWORLD_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []}
# MetaWorld # MetaWorld

View File

@ -3,7 +3,7 @@ from gym.wrappers import FlattenObservation
from . import classic_control, mujoco, robotics from . import classic_control, mujoco, robotics
ALL_GYM_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "DetPMP": []} ALL_GYM_MOTION_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "DetPMP": []}
# Short Continuous Mountain Car # Short Continuous Mountain Car
register( register(

View File

@ -7,6 +7,7 @@ from gym.envs.registration import EnvSpec
from mp_env_api import MPEnvWrapper from mp_env_api import MPEnvWrapper
from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper
from mp_env_api.mp_wrappers.dmp_wrapper import DmpWrapper from mp_env_api.mp_wrappers.dmp_wrapper import DmpWrapper
from mp_env_api.mp_wrappers.promp_wrapper import ProMPWrapper
def make_rank(env_id: str, seed: int, rank: int = 0, return_callable=True, **kwargs): def make_rank(env_id: str, seed: int, rank: int = 0, return_callable=True, **kwargs):
@ -132,6 +133,26 @@ def make_dmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs
return DmpWrapper(_env, **mp_kwargs) return DmpWrapper(_env, **mp_kwargs)
def make_promp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs):
"""
This can also be used standalone for manually building a custom ProMP environment.
Args:
env_id: base_env_name,
wrappers: list of wrappers (at least an MPEnvWrapper),
mp_kwargs: dict of at least {num_dof: int, num_basis: int, width: int}
Returns: ProMP wrapped gym env
"""
_verify_time_limit(mp_kwargs.get("duration", None), kwargs.get("time_limit", None))
_env = _make_wrapped_env(env_id=env_id, wrappers=wrappers, seed=seed, **kwargs)
_verify_dof(_env, mp_kwargs.get("num_dof"))
return ProMPWrapper(_env, **mp_kwargs)
def make_detpmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs): def make_detpmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwargs):
""" """
This can also be used standalone for manually building a custom Det ProMP environment. This can also be used standalone for manually building a custom Det ProMP environment.
@ -140,7 +161,7 @@ def make_detpmp_env(env_id: str, wrappers: Iterable, seed=1, mp_kwargs={}, **kwa
wrappers: list of wrappers (at least an MPEnvWrapper), wrappers: list of wrappers (at least an MPEnvWrapper),
mp_kwargs: dict of at least {num_dof: int, num_basis: int, width: int} mp_kwargs: dict of at least {num_dof: int, num_basis: int, width: int}
Returns: DMP wrapped gym env Returns: Det ProMP wrapped gym env
""" """
_verify_time_limit(mp_kwargs.get("duration", None), kwargs.get("time_limit", None)) _verify_time_limit(mp_kwargs.get("duration", None), kwargs.get("time_limit", None))
@ -171,6 +192,26 @@ def make_dmp_env_helper(**kwargs):
mp_kwargs=kwargs.pop("mp_kwargs"), **kwargs) mp_kwargs=kwargs.pop("mp_kwargs"), **kwargs)
def make_promp_env_helper(**kwargs):
"""
Helper function for registering ProMP gym environments.
This can also be used standalone for manually building a custom ProMP environment.
Args:
**kwargs: expects at least the following:
{
"name": base_env_name,
"wrappers": list of wrappers (at least an MPEnvWrapper),
"mp_kwargs": dict of at least {num_dof: int, num_basis: int, width: int}
}
Returns: ProMP wrapped gym env
"""
seed = kwargs.pop("seed", None)
return make_promp_env(env_id=kwargs.pop("name"), wrappers=kwargs.pop("wrappers"), seed=seed,
mp_kwargs=kwargs.pop("mp_kwargs"), **kwargs)
def make_detpmp_env_helper(**kwargs): def make_detpmp_env_helper(**kwargs):
""" """
Helper function for registering ProMP gym environments. Helper function for registering ProMP gym environments.