mp wrapper fixes

This commit is contained in:
Fabian 2022-07-06 09:05:35 +02:00
parent eddef33d9a
commit 6704c9d63a
43 changed files with 302 additions and 608 deletions

View File

@ -118,33 +118,30 @@ for _dims in [5, 7]:
}
)
## Hopper Jump random joints and desired position
register(
id='HopperJumpSparse-v0',
entry_point='alr_envs.alr.mujoco:ALRHopperXYJumpEnv',
entry_point='alr_envs.alr.mujoco:HopperJumpEnv',
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
kwargs={
# "max_episode_steps": MAX_EPISODE_STEPS_HOPPERJUMP,
"sparse": True,
# "healthy_reward": 1.0
}
)
## Hopper Jump random joints and desired position step based reward
register(
id='HopperJump-v0',
entry_point='alr_envs.alr.mujoco:ALRHopperXYJumpEnvStepBased',
entry_point='alr_envs.alr.mujoco:HopperJumpEnv',
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
kwargs={
# "max_episode_steps": MAX_EPISODE_STEPS_HOPPERJUMP,
"sparse": False,
# "healthy_reward": 1.0
"healthy_reward": 1.0,
"contact_weight": 0.0,
"height_weight": 3.0,
}
)
register(
id='ALRAntJump-v0',
entry_point='alr_envs.alr.mujoco:ALRAntJumpEnv',
entry_point='alr_envs.alr.mujoco:AntJumpEnv',
max_episode_steps=MAX_EPISODE_STEPS_ANTJUMP,
kwargs={
"max_episode_steps": MAX_EPISODE_STEPS_ANTJUMP,
@ -266,7 +263,7 @@ for _v in _versions:
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env_helper',
# max_episode_steps=1,
kwargs={
"name": f"alr_envs:{_v}",
"name": f"{_v}",
"wrappers": [classic_control.simple_reacher.MPWrapper],
"traj_gen_kwargs": {
"num_dof": 2 if "long" not in _v.lower() else 5,
@ -304,7 +301,7 @@ register(
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env_helper',
# max_episode_steps=1,
kwargs={
"name": "alr_envs:ViaPointReacher-v0",
"name": "ViaPointReacher-v0",
"wrappers": [classic_control.viapoint_reacher.MPWrapper],
"traj_gen_kwargs": {
"num_dof": 5,
@ -340,7 +337,7 @@ for _v in _versions:
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env_helper',
# max_episode_steps=1,
kwargs={
"name": f"alr_envs:HoleReacher-{_v}",
"name": f"HoleReacher-{_v}",
"wrappers": [classic_control.hole_reacher.MPWrapper],
"traj_gen_kwargs": {
"num_dof": 5,
@ -362,7 +359,7 @@ for _v in _versions:
kwargs_dict_hole_reacher_promp['wrappers'].append(classic_control.hole_reacher.MPWrapper)
kwargs_dict_hole_reacher_promp['trajectory_generator_kwargs']['weight_scale'] = 2
kwargs_dict_hole_reacher_promp['controller_kwargs']['controller_type'] = 'velocity'
kwargs_dict_hole_reacher_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_hole_reacher_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
@ -370,8 +367,8 @@ for _v in _versions:
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
## ALRReacher
_versions = ["ALRReacher-v0", "ALRLongReacher-v0", "ALRReacherSparse-v0", "ALRLongReacherSparse-v0"]
## ReacherNd
_versions = ["Reacher5d-v0", "Reacher7d-v0", "Reacher5dSparse-v0", "Reacher7dSparse-v0"]
for _v in _versions:
_name = _v.split("-")
_env_id = f'{_name[0]}DMP-{_name[1]}'
@ -380,7 +377,7 @@ for _v in _versions:
entry_point='alr_envs.utils.make_env_helpers:make_dmp_env_helper',
# max_episode_steps=1,
kwargs={
"name": f"alr_envs:{_v}",
"name": f"{_v}",
"wrappers": [mujoco.reacher.MPWrapper],
"traj_gen_kwargs": {
"num_dof": 5 if "long" not in _v.lower() else 7,
@ -404,10 +401,10 @@ for _v in _versions:
kwargs_dict_alr_reacher_promp['wrappers'].append(mujoco.reacher.MPWrapper)
kwargs_dict_alr_reacher_promp['controller_kwargs']['p_gains'] = 1
kwargs_dict_alr_reacher_promp['controller_kwargs']['d_gains'] = 0.1
kwargs_dict_alr_reacher_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_alr_reacher_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
entry_point='alr_envs.utils.make_env_helpers:make_bb_env_helper',
kwargs=kwargs_dict_alr_reacher_promp
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
@ -425,7 +422,7 @@ for _v in _versions:
kwargs_dict_bp_promp['controller_kwargs']['d_gains'] = np.array([0.02333333, 0.1, 0.0625, 0.08, 0.03, 0.03, 0.0125])
kwargs_dict_bp_promp['basis_generator_kwargs']['num_basis'] = 2
kwargs_dict_bp_promp['basis_generator_kwargs']['num_basis_zero_start'] = 2
kwargs_dict_bp_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_bp_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_bb_env_helper',
@ -445,7 +442,7 @@ for _v in _versions:
kwargs_dict_bp_promp['controller_kwargs']['d_gains'] = np.array([0.02333333, 0.1, 0.0625, 0.08, 0.03, 0.03, 0.0125])
kwargs_dict_bp_promp['basis_generator_kwargs']['num_basis'] = 2
kwargs_dict_bp_promp['basis_generator_kwargs']['num_basis_zero_start'] = 2
kwargs_dict_bp_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_bp_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_mp_env_helper',
@ -465,7 +462,7 @@ for _v in _versions:
_env_id = f'{_name[0]}ProMP-{_name[1]}'
kwargs_dict_ant_jump_promp = deepcopy(DEFAULT_BB_DICT)
kwargs_dict_ant_jump_promp['wrappers'].append(mujoco.ant_jump.MPWrapper)
kwargs_dict_ant_jump_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_ant_jump_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_mp_env_helper',
@ -482,7 +479,7 @@ for _v in _versions:
_env_id = f'{_name[0]}ProMP-{_name[1]}'
kwargs_dict_halfcheetah_jump_promp = deepcopy(DEFAULT_BB_DICT)
kwargs_dict_halfcheetah_jump_promp['wrappers'].append(mujoco.half_cheetah_jump.MPWrapper)
kwargs_dict_halfcheetah_jump_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_halfcheetah_jump_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
@ -502,7 +499,7 @@ for _v in _versions:
_env_id = f'{_name[0]}ProMP-{_name[1]}'
kwargs_dict_hopper_jump_promp = deepcopy(DEFAULT_BB_DICT)
kwargs_dict_hopper_jump_promp['wrappers'].append(mujoco.hopper_jump.MPWrapper)
kwargs_dict_hopper_jump_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_hopper_jump_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_mp_env_helper',
@ -520,7 +517,7 @@ for _v in _versions:
_env_id = f'{_name[0]}ProMP-{_name[1]}'
kwargs_dict_walker2d_jump_promp = deepcopy(DEFAULT_BB_DICT)
kwargs_dict_walker2d_jump_promp['wrappers'].append(mujoco.walker_2d_jump.MPWrapper)
kwargs_dict_walker2d_jump_promp['name'] = f"alr_envs:{_v}"
kwargs_dict_walker2d_jump_promp['name'] = f"{_v}"
register(
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
@ -583,7 +580,7 @@ register(
# CtxtFree are v0, Contextual are v1
register(
id='ALRAntJump-v0',
entry_point='alr_envs.alr.mujoco:ALRAntJumpEnv',
entry_point='alr_envs.alr.mujoco:AntJumpEnv',
max_episode_steps=MAX_EPISODE_STEPS_ANTJUMP,
kwargs={
"max_episode_steps": MAX_EPISODE_STEPS_ANTJUMP,
@ -602,7 +599,7 @@ register(
)
register(
id='ALRHopperJump-v0',
entry_point='alr_envs.alr.mujoco:ALRHopperJumpEnv',
entry_point='alr_envs.alr.mujoco:HopperJumpEnv',
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
kwargs={
"max_episode_steps": MAX_EPISODE_STEPS_HOPPERJUMP,
@ -649,7 +646,7 @@ for i in _vs:
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:{_env_id.replace('ProMP', '')}",
"name": f"{_env_id.replace('ProMP', '')}",
"wrappers": [mujoco.reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 5,
@ -672,7 +669,7 @@ for i in _vs:
id=_env_id,
entry_point='alr_envs.utils.make_env_helpers:make_promp_env_helper',
kwargs={
"name": f"alr_envs:{_env_id.replace('ProMP', '')}",
"name": f"{_env_id.replace('ProMP', '')}",
"wrappers": [mujoco.reacher.MPWrapper],
"mp_kwargs": {
"num_dof": 5,

View File

@ -1,9 +1,10 @@
from abc import ABC, abstractmethod
from typing import Union
from typing import Union, Tuple, Optional
import gym
import numpy as np
from gym import spaces
from gym.core import ObsType
from gym.utils import seeding
from alr_envs.alr.classic_control.utils import intersect
@ -14,8 +15,7 @@ class BaseReacherEnv(gym.Env, ABC):
Base class for all reaching environments.
"""
def __init__(self, n_links: int, random_start: bool = True,
allow_self_collision: bool = False):
def __init__(self, n_links: int, random_start: bool = True, allow_self_collision: bool = False):
super().__init__()
self.link_lengths = np.ones(n_links)
self.n_links = n_links
@ -70,7 +70,8 @@ class BaseReacherEnv(gym.Env, ABC):
def current_vel(self):
return self._angle_velocity.copy()
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]:
# Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)

View File

@ -1,8 +1,9 @@
from typing import Union
from typing import Union, Optional, Tuple
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym.core import ObsType
from matplotlib import patches
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
@ -51,7 +52,8 @@ class HoleReacherEnv(BaseReacherDirectEnv):
else:
raise ValueError("Unknown reward function {}".format(rew_fct))
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]:
self._generate_hole()
self._set_patches()
self.reward_function.reset()
@ -223,6 +225,7 @@ class HoleReacherEnv(BaseReacherDirectEnv):
if __name__ == "__main__":
import time
env = HoleReacherEnv(5)
env.reset()

View File

@ -7,6 +7,7 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self):
return np.hstack([
[self.env.random_start] * self.env.n_links, # cos
@ -25,7 +26,3 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.current_vel
@property
def dt(self) -> Union[float, int]:
return self.env.dt

View File

@ -7,6 +7,7 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self):
return np.hstack([
[self.env.random_start] * self.env.n_links, # cos
@ -23,7 +24,3 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.current_vel
@property
def dt(self) -> Union[float, int]:
return self.env.dt

View File

@ -1,8 +1,9 @@
from typing import Iterable, Union
from typing import Iterable, Union, Optional, Tuple
import matplotlib.pyplot as plt
import numpy as np
from gym import spaces
from gym.core import ObsType
from alr_envs.alr.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv
@ -15,7 +16,7 @@ class SimpleReacherEnv(BaseReacherTorqueEnv):
"""
def __init__(self, n_links: int, target: Union[None, Iterable] = None, random_start: bool = True,
allow_self_collision: bool = False,):
allow_self_collision: bool = False, ):
super().__init__(n_links, random_start, allow_self_collision)
# provided initial parameters
@ -41,7 +42,8 @@ class SimpleReacherEnv(BaseReacherTorqueEnv):
# def start_pos(self):
# return self._start_pos
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]:
self._generate_goal()
return super().reset()

View File

@ -7,6 +7,7 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self):
return np.hstack([
[self.env.random_start] * self.env.n_links, # cos
@ -24,7 +25,3 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.current_vel
@property
def dt(self) -> Union[float, int]:
return self.env.dt

View File

@ -1,8 +1,9 @@
from typing import Iterable, Union
from typing import Iterable, Union, Tuple, Optional
import gym
import matplotlib.pyplot as plt
import numpy as np
from gym.core import ObsType
from gym.utils import seeding
from alr_envs.alr.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
@ -40,7 +41,8 @@ class ViaPointReacherEnv(BaseReacherDirectEnv):
# def start_pos(self):
# return self._start_pos
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]:
self._generate_goal()
return super().reset()
@ -183,8 +185,10 @@ class ViaPointReacherEnv(BaseReacherDirectEnv):
plt.pause(0.01)
if __name__ == "__main__":
import time
env = ViaPointReacherEnv(5)
env.reset()

View File

@ -1,4 +1,4 @@
from .ant_jump.ant_jump import ALRAntJumpEnv
from .ant_jump.ant_jump import AntJumpEnv
from .ball_in_a_cup.ball_in_a_cup import ALRBallInACupEnv
from .ball_in_a_cup.biac_pd import ALRBallInACupPDEnv
from alr_envs.alr.mujoco.beerpong.beerpong import BeerPongEnv

View File

@ -1 +1 @@
from .new_mp_wrapper import MPWrapper
from .mp_wrapper import MPWrapper

View File

@ -1,14 +1,19 @@
from typing import Tuple, Union, Optional
import numpy as np
from gym.core import ObsType
from gym.envs.mujoco.ant_v3 import AntEnv
MAX_EPISODE_STEPS_ANTJUMP = 200
# TODO: This environment was not testet yet. Do the following todos and test it.
# TODO: This environment was not tested yet. Do the following todos and test it.
# TODO: Right now this environment only considers jumping to a specific height, which is not nice. It should be extended
# to the same structure as the Hopper, where the angles are randomized (->contexts) and the agent should jump as heigh
# as possible, while landing at a specific target position
class ALRAntJumpEnv(AntEnv):
class AntJumpEnv(AntEnv):
"""
Initialization changes to normal Ant:
- healthy_reward: 1.0 -> 0.01 -> 0.0 no healthy reward needed - Paul and Marc
@ -27,17 +32,15 @@ class ALRAntJumpEnv(AntEnv):
contact_force_range=(-1.0, 1.0),
reset_noise_scale=0.1,
exclude_current_positions_from_observation=True,
max_episode_steps=200):
):
self.current_step = 0
self.max_height = 0
self.max_episode_steps = max_episode_steps
self.goal = 0
super().__init__(xml_file, ctrl_cost_weight, contact_cost_weight, healthy_reward, terminate_when_unhealthy,
healthy_z_range, contact_force_range, reset_noise_scale,
exclude_current_positions_from_observation)
def step(self, action):
self.current_step += 1
self.do_simulation(action, self.frame_skip)
@ -54,10 +57,10 @@ class ALRAntJumpEnv(AntEnv):
done = height < 0.3 # fall over -> is the 0.3 value from healthy_z_range? TODO change 0.3 to the value of healthy z angle
if self.current_step == self.max_episode_steps or done:
if self.current_step == MAX_EPISODE_STEPS_ANTJUMP or done:
# -10 for scaling the value of the distance between the max_height and the goal height; only used when context is enabled
# height_reward = -10 * (np.linalg.norm(self.max_height - self.goal))
height_reward = -10*np.linalg.norm(self.max_height - self.goal)
height_reward = -10 * np.linalg.norm(self.max_height - self.goal)
# no healthy reward when using context, because we optimize a negative value
healthy_reward = 0
@ -77,7 +80,8 @@ class ALRAntJumpEnv(AntEnv):
def _get_obs(self):
return np.append(super()._get_obs(), self.goal)
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]:
self.current_step = 0
self.max_height = 0
self.goal = np.random.uniform(1.0, 2.5,
@ -96,19 +100,3 @@ class ALRAntJumpEnv(AntEnv):
observation = self._get_obs()
return observation
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
env = ALRAntJumpEnv()
obs = env.reset()
for i in range(2000):
# test with random actions
ac = env.action_space.sample()
obs, rew, d, info = env.step(ac)
if i % 10 == 0:
env.render(mode=render_mode)
if d:
env.reset()
env.close()

View File

@ -1,4 +1,4 @@
from typing import Tuple, Union
from typing import Union, Tuple
import numpy as np
@ -8,7 +8,7 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self) -> np.ndarray:
def context_mask(self):
return np.hstack([
[False] * 111, # ant has 111 dimensional observation space !!
[True] # goal height
@ -21,11 +21,3 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[6:14].copy()
@property
def goal_pos(self) -> Union[float, int, np.ndarray, Tuple]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
@property
def dt(self) -> Union[float, int]:
return self.env.dt

View File

@ -1,22 +0,0 @@
from alr_envs.black_box.black_box_wrapper import BlackBoxWrapper
from typing import Union, Tuple
import numpy as np
from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
def get_context_mask(self):
return np.hstack([
[False] * 111, # ant has 111 dimensional observation space !!
[True] # goal height
])
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[7:15].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[6:14].copy()

View File

@ -1,5 +1,5 @@
import numpy as np
from alr_envs.alr.mujoco import alr_reward_fct
from alr_envs.alr.mujoco.ball_in_a_cup import alr_reward_fct
class BallInACupReward(alr_reward_fct.AlrReward):

View File

@ -1,5 +1,5 @@
import numpy as np
from alr_envs.alr.mujoco import alr_reward_fct
from alr_envs.alr.mujoco.ball_in_a_cup import alr_reward_fct
class BallInACupReward(alr_reward_fct.AlrReward):

View File

@ -6,17 +6,6 @@ import mujoco_py.builder
import numpy as np
from gym import utils
from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper
from mp_env_api.utils.policies import PDControllerExtend
def make_detpmp_env(**kwargs):
name = kwargs.pop("name")
_env = gym.make(name)
policy = PDControllerExtend(_env, p_gains=kwargs.pop('p_gains'), d_gains=kwargs.pop('d_gains'))
kwargs['policy_type'] = policy
return DetPMPWrapper(_env, **kwargs)
class ALRBallInACupPDEnv(mujoco_env.MujocoEnv, utils.EzPickle):
def __init__(self, frame_skip=4, apply_gravity_comp=True, simplified: bool = False,

View File

@ -1 +1 @@
from .new_mp_wrapper import MPWrapper
from .mp_wrapper import MPWrapper

View File

@ -1,12 +1,11 @@
import os
from typing import Optional
import mujoco_py.builder
import numpy as np
from gym import utils
from gym.envs.mujoco import MujocoEnv
from alr_envs.alr.mujoco.beerpong.deprecated.beerpong_reward_staged import BeerPongReward
# XML Variables
ROBOT_COLLISION_OBJ = ["wrist_palm_link_convex_geom",
"wrist_pitch_link_convex_decomposition_p1_geom",
@ -76,7 +75,7 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
def start_vel(self):
return self._start_vel
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
self.dists = []
self.dists_final = []
self.action_costs = []

View File

@ -1,4 +1,4 @@
from typing import Tuple, Union
from typing import Union, Tuple
import numpy as np
@ -7,8 +7,7 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self) -> np.ndarray:
def get_context_mask(self):
return np.hstack([
[False] * 7, # cos
[False] * 7, # sin
@ -19,22 +18,25 @@ class MPWrapper(RawInterfaceWrapper):
[False] # env steps
])
@property
def start_pos(self):
return self._start_pos
@property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return self.sim.data.qpos[0:7].copy()
return self.env.sim.data.qpos[0:7].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.sim.data.qvel[0:7].copy()
return self.env.sim.data.qvel[0:7].copy()
@property
def goal_pos(self):
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
# TODO: Fix this
def _episode_callback(self, action: np.ndarray, mp) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
if mp.learn_tau:
self.env.env.release_step = action[0] / self.env.dt # Tau value
return action, None
else:
return action, None
@property
def dt(self) -> Union[float, int]:
return self.env.dt
def set_context(self, context):
xyz = np.zeros(3)
xyz[:2] = context
xyz[-1] = 0.840
self.env.env.model.body_pos[self.env.env.cup_table_id] = xyz
return self.get_observation_from_step(self.env.env._get_obs())

View File

@ -1,41 +0,0 @@
from typing import Union, Tuple
import numpy as np
from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qpos[0:7].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[0:7].copy()
def get_context_mask(self):
return np.hstack([
[False] * 7, # cos
[False] * 7, # sin
[False] * 7, # joint velocities
[False] * 3, # cup_goal_diff_final
[False] * 3, # cup_goal_diff_top
[True] * 2, # xy position of cup
[False] # env steps
])
# TODO: Fix this
def _episode_callback(self, action: np.ndarray, mp) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
if mp.learn_tau:
self.env.env.release_step = action[0] / self.env.dt # Tau value
return action, None
else:
return action, None
def set_context(self, context):
xyz = np.zeros(3)
xyz[:2] = context
xyz[-1] = 0.840
self.env.env.model.body_pos[self.env.env.cup_table_id] = xyz
return self.get_observation_from_step(self.env.env._get_obs())

View File

@ -1 +1 @@
from .new_mp_wrapper import MPWrapper
from .mp_wrapper import MPWrapper

View File

@ -1,4 +1,7 @@
import os
from typing import Tuple, Union, Optional
from gym.core import ObsType
from gym.envs.mujoco.half_cheetah_v3 import HalfCheetahEnv
import numpy as np
@ -20,7 +23,7 @@ class ALRHalfCheetahJumpEnv(HalfCheetahEnv):
max_episode_steps=100):
self.current_step = 0
self.max_height = 0
self.max_episode_steps = max_episode_steps
# self.max_episode_steps = max_episode_steps
self.goal = 0
self.context = context
xml_file = os.path.join(os.path.dirname(__file__), "assets", xml_file)
@ -37,14 +40,14 @@ class ALRHalfCheetahJumpEnv(HalfCheetahEnv):
## Didnt use fell_over, because base env also has no done condition - Paul and Marc
# fell_over = abs(self.sim.data.qpos[2]) > 2.5 # how to figure out if the cheetah fell over? -> 2.5 oke?
# TODO: Should a fall over be checked herE?
# TODO: Should a fall over be checked here?
done = False
ctrl_cost = self.control_cost(action)
costs = ctrl_cost
if self.current_step == self.max_episode_steps:
height_goal_distance = -10*np.linalg.norm(self.max_height - self.goal) + 1e-8 if self.context \
if self.current_step == MAX_EPISODE_STEPS_HALFCHEETAHJUMP:
height_goal_distance = -10 * np.linalg.norm(self.max_height - self.goal) + 1e-8 if self.context \
else self.max_height
rewards = self._forward_reward_weight * height_goal_distance
else:
@ -62,7 +65,8 @@ class ALRHalfCheetahJumpEnv(HalfCheetahEnv):
def _get_obs(self):
return np.append(super()._get_obs(), self.goal)
def reset(self):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]:
self.max_height = 0
self.current_step = 0
self.goal = np.random.uniform(1.1, 1.6, 1) # 1.1 1.6
@ -80,21 +84,3 @@ class ALRHalfCheetahJumpEnv(HalfCheetahEnv):
observation = self._get_obs()
return observation
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
env = ALRHalfCheetahJumpEnv()
obs = env.reset()
for i in range(2000):
# objective.load_result("/tmp/cma")
# test with random actions
ac = env.action_space.sample()
obs, rew, d, info = env.step(ac)
if i % 10 == 0:
env.render(mode=render_mode)
if d:
print('After ', i, ' steps, done: ', d)
env.reset()
env.close()

View File

@ -20,11 +20,3 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:9].copy()
@property
def goal_pos(self) -> Union[float, int, np.ndarray, Tuple]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
@property
def dt(self) -> Union[float, int]:
return self.env.dt

View File

@ -1,22 +0,0 @@
from typing import Tuple, Union
import numpy as np
from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
def context_mask(self):
return np.hstack([
[False] * 17,
[True] # goal height
])
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[3:9].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:9].copy()

View File

@ -1 +1,2 @@
from .mp_wrapper import MPWrapper

View File

@ -1,3 +1,4 @@
import copy
from typing import Optional
from gym.envs.mujoco.hopper_v3 import HopperEnv
@ -7,10 +8,11 @@ import os
MAX_EPISODE_STEPS_HOPPERJUMP = 250
class ALRHopperJumpEnv(HopperEnv):
class HopperJumpEnv(HopperEnv):
"""
Initialization changes to normal Hopper:
- terminate_when_unhealthy: True -> False
- healthy_reward: 1.0 -> 2.0
- healthy_z_range: (0.7, float('inf')) -> (0.5, float('inf'))
- healthy_angle_range: (-0.2, 0.2) -> (-float('inf'), float('inf'))
- exclude_current_positions_from_observation: True -> False
@ -21,24 +23,28 @@ class ALRHopperJumpEnv(HopperEnv):
xml_file='hopper_jump.xml',
forward_reward_weight=1.0,
ctrl_cost_weight=1e-3,
healthy_reward=1.0,
penalty=0.0,
healthy_reward=2.0, # 1 step
contact_weight=2.0, # 0 step
height_weight=10.0, # 3 step
dist_weight=3.0, # 3 step
terminate_when_unhealthy=False,
healthy_state_range=(-100.0, 100.0),
healthy_z_range=(0.5, float('inf')),
healthy_angle_range=(-float('inf'), float('inf')),
reset_noise_scale=5e-3,
exclude_current_positions_from_observation=False,
sparse=False,
):
self._steps = 0
self.sparse = sparse
self._height_weight = height_weight
self._dist_weight = dist_weight
self._contact_weight = contact_weight
self.max_height = 0
# self.penalty = penalty
self.goal = 0
self._floor_geom_id = None
self._foot_geom_id = None
self._steps = 0
self.contact_with_floor = False
self.init_floor_contact = False
self.has_left_floor = False
@ -49,12 +55,12 @@ class ALRHopperJumpEnv(HopperEnv):
healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
exclude_current_positions_from_observation)
# increase initial height
self.init_qpos[1] = 1.5
def step(self, action):
self._steps += 1
self._floor_geom_id = self.model.geom_name2id('floor')
self._foot_geom_id = self.model.geom_name2id('foot_geom')
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
@ -73,18 +79,19 @@ class ALRHopperJumpEnv(HopperEnv):
ctrl_cost = self.control_cost(action)
costs = ctrl_cost
done = False
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = goal_dist
rewards = 0
if self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP:
# healthy_reward = 0 if self.context else self.healthy_reward * self._steps
healthy_reward = self.healthy_reward * 2 # * self._steps
contact_dist = self.contact_dist if self.contact_dist is not None else 5
dist_reward = self._forward_reward_weight * (-3 * goal_dist + 10 * self.max_height - 2 * contact_dist)
rewards = dist_reward + healthy_reward
if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP):
healthy_reward = self.healthy_reward
distance_reward = goal_dist * self._dist_weight
height_reward = (self.max_height if self.sparse else self.get_body_com("torso")[2]) * self._height_weight
contact_reward = (self.contact_dist or 5) * self._contact_weight
# dist_reward = self._forward_reward_weight * (-3 * goal_dist + 10 * self.max_height - 2 * contact_dist)
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
observation = self._get_obs()
reward = rewards - costs
@ -97,24 +104,40 @@ class ALRHopperJumpEnv(HopperEnv):
height_rew=self.max_height,
healthy_reward=self.healthy_reward * 2,
healthy=self.is_healthy,
contact_dist=self.contact_dist if self.contact_dist is not None else 0
contact_dist=self.contact_dist or 0
)
return observation, reward, done, info
def _get_obs(self):
return np.append(super()._get_obs(), self.goal)
goal_dist = self.data.get_site_xpos('foot_site') - np.array([self.goal, 0, 0])
return np.concatenate((super(HopperJumpEnv, self)._get_obs(), goal_dist.copy(), self.goal.copy()))
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None, ):
self.goal = self.np_random.uniform(1.4, 2.16, 1)[0] # 1.3 2.3
def reset_model(self):
super(HopperJumpEnv, self).reset_model()
self.goal = self.np_random.uniform(0.3, 1.35, 1)[0]
self.sim.model.body_pos[self.sim.model.body_name2id('goal_site_body')] = np.array([self.goal, 0, 0])
self.max_height = 0
self._steps = 0
return super().reset()
# overwrite reset_model to make it deterministic
def reset_model(self):
noise_low = -np.zeros(self.model.nq)
noise_low[3] = -0.5
noise_low[4] = -0.2
noise_low[5] = 0
qpos = self.init_qpos # + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq)
qvel = self.init_qvel # + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nv)
noise_high = np.zeros(self.model.nq)
noise_high[3] = 0
noise_high[4] = 0
noise_high[5] = 0.785
qpos = (
self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq) +
self.init_qpos
)
qvel = (
# self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nv) +
self.init_qvel
)
self.set_state(qpos, qvel)
@ -123,6 +146,7 @@ class ALRHopperJumpEnv(HopperEnv):
self.contact_with_floor = False
self.init_floor_contact = False
self.contact_dist = None
return observation
def _is_floor_foot_contact(self):
@ -137,184 +161,75 @@ class ALRHopperJumpEnv(HopperEnv):
return False
class ALRHopperXYJumpEnv(ALRHopperJumpEnv):
class HopperJumpStepEnv(HopperJumpEnv):
def __init__(self,
xml_file='hopper_jump.xml',
forward_reward_weight=1.0,
ctrl_cost_weight=1e-3,
healthy_reward=1.0,
height_weight=3,
dist_weight=3,
terminate_when_unhealthy=False,
healthy_state_range=(-100.0, 100.0),
healthy_z_range=(0.5, float('inf')),
healthy_angle_range=(-float('inf'), float('inf')),
reset_noise_scale=5e-3,
exclude_current_positions_from_observation=False
):
self._height_weight = height_weight
self._dist_weight = dist_weight
super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
exclude_current_positions_from_observation)
def step(self, action):
self._floor_geom_id = self.model.geom_name2id('floor')
self._foot_geom_id = self.model.geom_name2id('foot_geom')
self._steps += 1
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
site_pos_after = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy()
site_pos_after = self.data.get_site_xpos('foot_site')
self.max_height = max(height_after, self.max_height)
# floor_contact = self._contact_checker(self._floor_geom_id, self._foot_geom_id) if not self.contact_with_floor else False
# self.init_floor_contact = floor_contact if not self.init_floor_contact else self.init_floor_contact
# self.has_left_floor = not floor_contact if self.init_floor_contact and not self.has_left_floor else self.has_left_floor
# self.contact_with_floor = floor_contact if not self.contact_with_floor and self.has_left_floor else self.contact_with_floor
floor_contact = self._is_floor_foot_contact(self._floor_geom_id,
self._foot_geom_id) if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = floor_contact
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = np.linalg.norm(self.sim.data.site_xpos[self.model.site_name2id('foot_site')]
- np.array([self.goal, 0, 0]))
ctrl_cost = self.control_cost(action)
healthy_reward = self.healthy_reward
height_reward = self._height_weight * height_after
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
goal_dist_reward = -self._dist_weight * goal_dist
dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
rewards = dist_reward + healthy_reward
costs = ctrl_cost
done = False
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
rewards = 0
if self._steps >= self.max_episode_steps:
# healthy_reward = 0 if self.context else self.healthy_reward * self._steps
healthy_reward = self.healthy_reward * 2 # * self._steps
contact_dist = self.contact_dist if self.contact_dist is not None else 5
dist_reward = self._forward_reward_weight * (-3 * goal_dist + 10 * self.max_height - 2 * contact_dist)
rewards = dist_reward + healthy_reward
# This is only for logging the distance to goal when first having the contact
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = has_floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not has_floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = has_floor_contact
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = goal_dist
##############################################################
observation = self._get_obs()
reward = rewards - costs
info = {
'height': height_after,
'x_pos': site_pos_after,
'max_height': self.max_height,
'goal': self.goal,
'max_height': copy.copy(self.max_height),
'goal': copy.copy(self.goal),
'goal_dist': goal_dist,
'height_rew': self.max_height,
'healthy_reward': self.healthy_reward * 2,
'healthy': self.is_healthy,
'contact_dist': self.contact_dist if self.contact_dist is not None else 0
}
return observation, reward, done, info
def reset_model(self):
self.init_qpos[1] = 1.5
self._floor_geom_id = self.model.geom_name2id('floor')
self._foot_geom_id = self.model.geom_name2id('foot_geom')
noise_low = -np.zeros(self.model.nq)
noise_low[3] = -0.5
noise_low[4] = -0.2
noise_low[5] = 0
noise_high = np.zeros(self.model.nq)
noise_high[3] = 0
noise_high[4] = 0
noise_high[5] = 0.785
rnd_vec = self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq)
qpos = self.init_qpos + rnd_vec
qvel = self.init_qvel
self.set_state(qpos, qvel)
observation = self._get_obs()
self.has_left_floor = False
self.contact_with_floor = False
self.init_floor_contact = False
self.contact_dist = None
return observation
def reset(self):
super().reset()
self.goal = self.np_random.uniform(0.3, 1.35, 1)[0]
self.sim.model.body_pos[self.sim.model.body_name2id('goal_site_body')] = np.array([self.goal, 0, 0])
return self.reset_model()
def _get_obs(self):
goal_diff = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy() \
- np.array([self.goal, 0, 0])
return np.concatenate((super(ALRHopperXYJumpEnv, self)._get_obs(), goal_diff))
def set_context(self, context):
# context is 4 dimensional
qpos = self.init_qpos
qvel = self.init_qvel
qpos[-3:] = context[:3]
self.goal = context[-1]
self.set_state(qpos, qvel)
self.sim.model.body_pos[self.sim.model.body_name2id('goal_site_body')] = np.array([self.goal, 0, 0])
return self._get_obs()
class ALRHopperXYJumpEnvStepBased(ALRHopperXYJumpEnv):
def __init__(
self,
xml_file='hopper_jump.xml',
forward_reward_weight=1.0,
ctrl_cost_weight=1e-3,
healthy_reward=0.0,
penalty=0.0,
context=True,
terminate_when_unhealthy=False,
healthy_state_range=(-100.0, 100.0),
healthy_z_range=(0.5, float('inf')),
healthy_angle_range=(-float('inf'), float('inf')),
reset_noise_scale=5e-3,
exclude_current_positions_from_observation=False,
max_episode_steps=250,
height_scale=10,
dist_scale=3,
healthy_scale=2
):
self.height_scale = height_scale
self.dist_scale = dist_scale
self.healthy_scale = healthy_scale
super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, penalty, context,
terminate_when_unhealthy, healthy_state_range, healthy_z_range, healthy_angle_range,
reset_noise_scale, exclude_current_positions_from_observation, max_episode_steps)
def step(self, action):
self._floor_geom_id = self.model.geom_name2id('floor')
self._foot_geom_id = self.model.geom_name2id('foot_geom')
self._steps += 1
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
site_pos_after = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy()
self.max_height = max(height_after, self.max_height)
ctrl_cost = self.control_cost(action)
healthy_reward = self.healthy_reward * self.healthy_scale
height_reward = self.height_scale * height_after
goal_dist = np.atleast_1d(np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0], dtype=object)))[0]
goal_dist_reward = -self.dist_scale * goal_dist
dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
reward = -ctrl_cost + healthy_reward + dist_reward
done = False
observation = self._get_obs()
###########################################################
# This is only for logging the distance to goal when first having the contact
##########################################################
floor_contact = self._is_floor_foot_contact(self._floor_geom_id,
self._foot_geom_id) if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = floor_contact
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = np.linalg.norm(self.sim.data.site_xpos[self.model.site_name2id('foot_site')]
- np.array([self.goal, 0, 0]))
info = {
'height': height_after,
'x_pos': site_pos_after,
'max_height': self.max_height,
'goal': self.goal,
'goal_dist': goal_dist,
'height_rew': self.max_height,
'healthy_reward': self.healthy_reward * self.healthy_reward,
'healthy': self.is_healthy,
'contact_dist': self.contact_dist if self.contact_dist is not None else 0
'height_rew': height_reward,
'healthy_reward': healthy_reward,
'healthy': copy.copy(self.is_healthy),
'contact_dist': copy.copy(self.contact_dist) or 0
}
return observation, reward, done, info

View File

@ -8,6 +8,7 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
# Random x goal + random init pos
@property
def context_mask(self):
return np.hstack([
[False] * (2 + int(not self.exclude_current_positions_from_observation)), # position

View File

@ -6,8 +6,9 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self) -> np.ndarray:
def context_mask(self):
return np.hstack([
[False] * 17,
[True] # goal pos
@ -15,16 +16,8 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[3:6].copy()
return self.env.data.qpos[3:6].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:6].copy()
@property
def goal_pos(self) -> Union[float, int, np.ndarray, Tuple]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
@property
def dt(self) -> Union[float, int]:
return self.env.dt
return self.env.data.qvel[3:6].copy()

View File

@ -1,25 +0,0 @@
from typing import Tuple, Union
import numpy as np
from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
def context_mask(self):
return np.hstack([
[False] * 17,
[True] # goal pos
])
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[3:6].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:6].copy()
@property
def dt(self) -> Union[float, int]:
return self.env.dt

View File

@ -15,14 +15,13 @@ class MPWrapper(RawInterfaceWrapper):
[True] * 2, # goal position
[False] * self.env.n_links, # angular velocity
[False] * 3, # goal distance
# self.get_body_com("target"), # only return target to make problem harder
[False], # step
# [False], # step
])
@property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qpos.flat[:self.env.n_links]
return self.env.data.qpos.flat[:self.env.n_links]
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel.flat[:self.env.n_links]
return self.env.data.qvel.flat[:self.env.n_links]

View File

@ -94,12 +94,12 @@ class ReacherEnv(MujocoEnv, utils.EzPickle):
return self._get_obs()
def _get_obs(self):
theta = self.sim.data.qpos.flat[:self.n_links]
theta = self.data.qpos.flat[:self.n_links]
target = self.get_body_com("target")
return np.concatenate([
np.cos(theta),
np.sin(theta),
target[:2], # x-y of goal position
self.sim.data.qvel.flat[:self.n_links], # angular velocity
self.data.qvel.flat[:self.n_links], # angular velocity
self.get_body_com("fingertip") - target, # goal distance
])

View File

@ -1 +1 @@
from .new_mp_wrapper import MPWrapper
from .mp_wrapper import MPWrapper

View File

@ -6,8 +6,9 @@ from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
@property
def context_mask(self) -> np.ndarray:
def context_mask(self):
return np.hstack([
[False] * 17,
[True] # goal pos
@ -15,16 +16,8 @@ class MPWrapper(RawInterfaceWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[3:9].copy()
return self.env.data.qpos[3:9].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:9].copy()
@property
def goal_pos(self) -> Union[float, int, np.ndarray, Tuple]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")
@property
def dt(self) -> Union[float, int]:
return self.env.dt
return self.env.data.qvel[3:9].copy()

View File

@ -1,26 +0,0 @@
from typing import Tuple, Union
import numpy as np
from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
class MPWrapper(RawInterfaceWrapper):
def context_mask(self):
return np.hstack([
[False] * 17,
[True] # goal pos
])
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[3:9].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:9].copy()
@property
def goal_pos(self) -> Union[float, int, np.ndarray, Tuple]:
raise ValueError("Goal position is not available and has to be learnt based on the environment.")

View File

@ -1,4 +1,4 @@
from typing import Tuple, Union
from typing import Tuple, Union, Optional
import gym
import numpy as np
@ -19,7 +19,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
duration: float,
verbose: int = 1,
learn_sub_trajectories: bool = False,
replanning_schedule: Union[None, callable] = None,
replanning_schedule: Optional[callable] = None,
reward_aggregation: callable = np.sum
):
"""
@ -41,7 +41,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.duration = duration
self.learn_sub_trajectories = learn_sub_trajectories
self.replanning_schedule = replanning_schedule
self.do_replanning = replanning_schedule is not None
self.replanning_schedule = replanning_schedule or (lambda *x: False)
self.current_traj_steps = 0
# trajectory generation
@ -55,12 +56,10 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.reward_aggregation = reward_aggregation
# spaces
self.return_context_observation = not (self.learn_sub_trajectories or replanning_schedule)
self.traj_gen_action_space = self.get_traj_gen_action_space()
self.action_space = self.get_action_space()
self.observation_space = spaces.Box(low=self.env.observation_space.low[self.env.context_mask],
high=self.env.observation_space.high[self.env.context_mask],
dtype=self.env.observation_space.dtype)
self.return_context_observation = not (learn_sub_trajectories or self.do_replanning)
self.traj_gen_action_space = self._get_traj_gen_action_space()
self.action_space = self._get_action_space()
self.observation_space = self._get_observation_space()
# rendering
self.render_kwargs = {}
@ -76,23 +75,24 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# TODO: Bruce said DMP, ProMP, ProDMP can have 0 bc_time for sequencing
# TODO Check with Bruce for replanning
self.traj_gen.set_boundary_conditions(
bc_time=np.zeros((1,)) if not self.replanning_schedule else self.current_traj_steps * self.dt,
bc_time=np.zeros((1,)) if not self.do_replanning else np.array([self.current_traj_steps * self.dt]),
bc_pos=self.current_pos, bc_vel=self.current_vel)
# TODO: is this correct for replanning? Do we need to adjust anything here?
self.traj_gen.set_duration(None if self.learn_sub_trajectories else np.array([self.duration]), np.array([self.dt]))
self.traj_gen.set_duration(None if self.learn_sub_trajectories else np.array([self.duration]),
np.array([self.dt]))
traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel']
return get_numpy(trajectory_tensor), get_numpy(velocity_tensor)
def get_traj_gen_action_space(self):
def _get_traj_gen_action_space(self):
"""This function can be used to set up an individual space for the parameters of the traj_gen."""
min_action_bounds, max_action_bounds = self.traj_gen.get_param_bounds()
mp_action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(),
dtype=np.float32)
return mp_action_space
action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(),
dtype=self.env.action_space.dtype)
return action_space
def get_action_space(self):
def _get_action_space(self):
"""
This function can be used to modify the action space for considering actions which are not learned via motion
primitives. E.g. ball releasing time for the beer pong task. By default, it is the parameter space of the
@ -102,12 +102,21 @@ class BlackBoxWrapper(gym.ObservationWrapper):
try:
return self.traj_gen_action_space
except AttributeError:
return self.get_traj_gen_action_space()
return self._get_traj_gen_action_space()
def _get_observation_space(self):
mask = self.env.context_mask
if not self.return_context_observation:
# return full observation
mask = np.ones_like(mask, dtype=bool)
min_obs_bound = self.env.observation_space.low[mask]
max_obs_bound = self.env.observation_space.high[mask]
return spaces.Box(low=min_obs_bound, high=max_obs_bound, dtype=self.env.observation_space.dtype)
def step(self, action: np.ndarray):
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
# agent to learn when to release the ball
# TODO remove this part, right now only needed for beer pong
mp_params, env_spec_params = self.env._episode_callback(action, self.traj_gen)
trajectory, velocity = self.get_trajectory(mp_params)
@ -121,10 +130,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
infos = dict()
done = False
for t, pos_vel in enumerate(zip(trajectory, velocity)):
step_action = self.tracking_controller.get_action(pos_vel[0], pos_vel[1], self.current_pos,
self.current_vel)
step_action = self._step_callback(t, env_spec_params, step_action) # include possible callback info
for t, (pos, vel) in enumerate(zip(trajectory, velocity)):
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
# print('step/clipped action ratio: ', step_action/c_action)
obs, c_reward, done, info = self.env.step(c_action)
@ -142,10 +149,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
if self.render_kwargs:
self.render(**self.render_kwargs)
if done:
break
if self.replanning_schedule and self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action,
if done or self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action,
t + 1 + self.current_traj_steps):
break
@ -153,25 +157,26 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.current_traj_steps += t + 1
if self.verbose >= 2:
infos['trajectory'] = trajectory
infos['positions'] = trajectory
infos['velocities'] = velocity
infos['step_actions'] = actions[:t + 1]
infos['step_observations'] = observations[:t + 1]
infos['step_rewards'] = rewards[:t + 1]
infos['trajectory_length'] = t + 1
trajectory_return = self.reward_aggregation(rewards[:t + 1])
return obs, trajectory_return, done, infos
return self.observation(obs), trajectory_return, done, infos
def render(self, **kwargs):
"""Only set render options here, such that they can be used during the rollout.
This only needs to be called once"""
self.render_kwargs = kwargs
self.render_kwargs = kwargs or self.render_kwargs
# self.env.render(mode=self.render_mode, **self.render_kwargs)
self.env.render(**kwargs)
self.env.render(**self.render_kwargs)
def reset(self, **kwargs):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
self.current_traj_steps = 0
super(BlackBoxWrapper, self).reset(**kwargs)
return super(BlackBoxWrapper, self).reset(seed=seed, return_info=return_info, options=options)
def plot_trajs(self, des_trajs, des_vels):
import matplotlib.pyplot as plt

View File

@ -57,7 +57,8 @@ class RawInterfaceWrapper(gym.Wrapper):
# return bool(self.replanning_model(s))
return False
def _episode_callback(self, action: np.ndarray, traj_gen: MPInterface) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
def _episode_callback(self, action: np.ndarray, traj_gen: MPInterface) -> Tuple[
np.ndarray, Union[np.ndarray, None]]:
"""
Used to extract the parameters for the motion primitive and other parameters from an action array which might
include other actions like ball releasing time for the beer pong environment.
@ -70,20 +71,3 @@ class RawInterfaceWrapper(gym.Wrapper):
Tuple: mp_arguments and other arguments
"""
return action, None
def _step_callback(self, t: int, env_spec_params: Union[np.ndarray, None], step_action: np.ndarray) -> Union[
np.ndarray]:
"""
This function can be used to modify the step_action with additional parameters e.g. releasing the ball in the
Beerpong env. The parameters used should not be part of the motion primitive parameters.
Returns step_action by default, can be overwritten in individual mp_wrappers.
Args:
t: the current time step of the episode
env_spec_params: the environment specific parameter, as defined in function _episode_callback
(e.g. ball release time in Beer Pong)
step_action: the current step-based action
Returns:
modified step action
"""
return step_action

View File

@ -87,10 +87,10 @@ if __name__ == '__main__':
render = True
# Basic gym task
example_general("Pendulum-v0", seed=10, iterations=200, render=render)
example_general("Reacher5d-v0", seed=10, iterations=200, render=render)
# # Basis task from framework
example_general("alr_envs:HoleReacher-v0", seed=10, iterations=200, render=render)
example_general("Reacher-v0", seed=10, iterations=200, render=render)
# # OpenAI Mujoco task
example_general("HalfCheetah-v2", seed=10, render=render)

View File

@ -45,7 +45,7 @@ def example_mp(env_name="alr_envs:HoleReacherDMP-v1", seed=1, iterations=1, rend
obs = env.reset()
def example_custom_mp(env_name="alr_envs:HoleReacherDMP-v1", seed=1, iterations=1, render=True):
def example_custom_mp(env_name="Reacher5dProMP-v0", seed=1, iterations=1, render=True):
"""
Example for running a motion primitive based environment, which is already registered
Args:
@ -57,32 +57,12 @@ def example_custom_mp(env_name="alr_envs:HoleReacherDMP-v1", seed=1, iterations=
Returns:
"""
# Changing the traj_gen_kwargs is possible by providing them to gym.
# E.g. here by providing way to many basis functions
# mp_dict = alr_envs.from_default_config('ALRReacher-v0', {'basis_generator_kwargs': {'num_basis': 10}})
# mp_dict.update({'basis_generator_kwargs': {'num_basis': 10}})
# Changing the arguments of the black box env is possible by providing them to gym as with all kwargs.
# E.g. here for way to many basis functions
env = alr_envs.make(env_name, seed, basis_generator_kwargs={'num_basis': 1000})
# mp_dict.update({'black_box_kwargs': {'learn_sub_trajectories': True}})
# mp_dict.update({'black_box_kwargs': {'do_replanning': lambda pos, vel, t: lambda t: t % 100}})
# default env with promp and no learn_sub_trajectories and replanning
# env = alr_envs.make('ALRReacherProMP-v0', 1, n_links=7)
env = alr_envs.make('ALRReacherProMP-v0', 1, basis_generator_kwargs={'num_basis': 10}, n_links=7)
# env = alr_envs.make('ALRReacher-v0', seed=1, bb_kwargs=mp_dict, n_links=1)
# env = alr_envs.make_bb('ALRReacher-v0', **mp_dict)
mp_kwargs = {
"num_dof": 5,
"num_basis": 1000,
"duration": 2,
"learn_goal": True,
"alpha_phase": 2,
"bandwidth_factor": 2,
"policy_type": "velocity",
"weights_scale": 50,
"goal_scale": 0.1
}
env = alr_envs.make(env_name, seed, mp_kwargs=mp_kwargs)
# This time rendering every trajectory
if render:
env.render(mode="human")
@ -100,6 +80,7 @@ def example_custom_mp(env_name="alr_envs:HoleReacherDMP-v1", seed=1, iterations=
print(rewards)
rewards = 0
obs = env.reset()
print(obs)
def example_fully_custom_mp(seed=1, iterations=1, render=True):
@ -169,7 +150,7 @@ if __name__ == '__main__':
# example_mp("alr_envs:HoleReacherDetPMP-v1", seed=10, iterations=1, render=render)
# Altered basis functions
example_custom_mp("alr_envs:HoleReacherDMP-v1", seed=10, iterations=1, render=render)
example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=10, render=render)
# Custom MP
example_fully_custom_mp(seed=10, iterations=1, render=render)
# example_fully_custom_mp(seed=10, iterations=1, render=render)

View File

@ -43,10 +43,10 @@ def make_rank(env_id: str, seed: int, rank: int = 0, return_callable=True, **kwa
def make(env_id, seed, **kwargs):
# spec = registry.get(env_id) # TODO: This doesn't work with gym ==0.21.0
spec = registry.spec(env_id)
# TODO: This doesn't work with gym ==0.21.0
# This access is required to allow for nested dict updates
all_kwargs = deepcopy(spec._kwargs)
spec = registry.get(env_id)
all_kwargs = deepcopy(spec.kwargs)
nested_update(all_kwargs, kwargs)
return _make(env_id, seed, **all_kwargs)
@ -148,7 +148,7 @@ def make_bb(
phase_kwargs: kwargs for the phase generator
controller_kwargs: kwargs for the tracking controller
env_id: base_env_name,
wrappers: list of wrappers (at least an BlackBoxWrapper),
wrappers: list of wrappers (at least an RawInterfaceWrapper),
seed: seed of environment
traj_gen_kwargs: dict of at least {num_dof: int, num_basis: int} for DMP

View File

@ -1,4 +1,4 @@
from collections import Mapping, MutableMapping
from collections.abc import Mapping, MutableMapping
import numpy as np
import torch as ch
@ -46,4 +46,5 @@ def nested_update(base: MutableMapping, update):
"""
for k, v in update.items():
base[k] = nested_update(base.get(k, {}), v) if isinstance(v, Mapping) else v
return base

View File

@ -1,24 +1,35 @@
import itertools
from setuptools import setup
# Environment-specific dependencies for dmc and metaworld
extras = {
"dmc": ["dm_control"],
"meta": ["mujoco_py<2.2,>=2.1, git+https://github.com/rlworkgroup/metaworld.git@master#egg=metaworld"],
"mujoco": ["mujoco==2.2.0", "imageio>=2.14.1"],
}
# All dependencies
all_groups = set(extras.keys())
extras["all"] = list(set(itertools.chain.from_iterable(map(lambda group: extras[group], all_groups))))
setup(
name='alr_envs',
author='Fabian Otto, Onur Celik, Marcel Sandermann, Maximilian Huettenrauch',
name='simple_gym',
version='0.0.1',
packages=['alr_envs', 'alr_envs.alr', 'alr_envs.open_ai', 'alr_envs.dmc', 'alr_envs.meta', 'alr_envs.utils'],
install_requires=[
'gym',
'PyQt5',
#'matplotlib',
#'mp_env_api @ git+https://github.com/ALRhub/motion_primitive_env_api.git',
# 'mp_env_api @ git+ssh://git@github.com/ALRhub/motion_primitive_env_api.git',
# 'matplotlib',
# 'mp_env_api @ git+https://github.com/ALRhub/motion_primitive_env_api.git',
# 'mp_env_api @ git+ssh://git@github.com/ALRhub/motion_primitive_env_api.git',
'mujoco-py<2.1,>=2.0',
'dm_control',
'metaworld @ git+https://github.com/rlworkgroup/metaworld.git@master#egg=metaworld',
],
url='https://github.com/ALRhub/alr_envs/',
license='MIT',
author='Fabian Otto, Marcel Sandermann, Maximilian Huettenrauch',
# license='AGPL-3.0 license',
author_email='',
description='Custom Gym environments for various (robotics) tasks. integration of DMC environments into the'
'gym interface, and support for using motion primitives with gym environments.'
description='Simple Gym: Aggregating interface for various RL environments with support for Black Box approaches.'
)