Merge branch 'tt_cluster_debug' into 55-table-tennis-dev

# Conflicts:
#	fancy_gym/black_box/black_box_wrapper.py
#	fancy_gym/envs/mujoco/box_pushing/box_pushing_env.py
#	fancy_gym/envs/mujoco/table_tennis/mp_wrapper.py
This commit is contained in:
Hongyi Zhou 2022-12-01 14:25:22 +01:00
commit a1d660d7ae
15 changed files with 476 additions and 350 deletions

View File

@ -2,7 +2,6 @@ from typing import Tuple, Optional, Callable
import gym import gym
import numpy as np import numpy as np
import torch
from gym import spaces from gym import spaces
from mp_pytorch.mp.mp_interfaces import MPInterface from mp_pytorch.mp.mp_interfaces import MPInterface
@ -23,8 +22,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
replanning_schedule: Optional[ replanning_schedule: Optional[
Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None, Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None,
reward_aggregation: Callable[[np.ndarray], float] = np.sum, reward_aggregation: Callable[[np.ndarray], float] = np.sum,
max_planning_times = None, max_planning_times: int = np.inf,
desired_conditioning: bool = False condition_on_desired: bool = False
): ):
""" """
gym.Wrapper for leveraging a black box approach with a trajectory generator. gym.Wrapper for leveraging a black box approach with a trajectory generator.
@ -59,21 +58,11 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# reward computation # reward computation
self.reward_aggregation = reward_aggregation self.reward_aggregation = reward_aggregation
# self.traj_gen.basis_gn.show_basis(plot=True)
# spaces # spaces
self.return_context_observation = not (learn_sub_trajectories or self.do_replanning) self.return_context_observation = not (learn_sub_trajectories or self.do_replanning)
# self.return_context_observation = True
self.traj_gen_action_space = self._get_traj_gen_action_space() self.traj_gen_action_space = self._get_traj_gen_action_space()
self.action_space = self._get_action_space() self.action_space = self._get_action_space()
# no goal learning
# tricky_action_upperbound = [np.inf] * (self.traj_gen_action_space.shape[0] - 7)
# tricky_action_lowerbound = [-np.inf] * (self.traj_gen_action_space.shape[0] - 7)
# self.action_space = spaces.Box(np.array(tricky_action_lowerbound), np.array(tricky_action_upperbound), dtype=np.float32)
self.action_space.low[0] = 0.8
self.action_space.high[0] = 1.5
self.action_space.low[1] = 0.05
self.action_space.high[1] = 0.15
self.observation_space = self._get_observation_space() self.observation_space = self._get_observation_space()
# rendering # rendering
@ -81,14 +70,12 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.verbose = verbose self.verbose = verbose
# condition value # condition value
self.desired_conditioning = False self.condition_on_desired = condition_on_desired
self.condition_pos = None self.condition_pos = None
self.condition_vel = None self.condition_vel = None
self.max_planning_times = max_planning_times self.max_planning_times = max_planning_times
self.plan_counts = 0 self.plan_steps = 0
self.tau_first_prediction = None
def observation(self, observation): def observation(self, observation):
# return context space if we are # return context space if we are
@ -98,9 +85,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
return observation.astype(self.observation_space.dtype) return observation.astype(self.observation_space.dtype)
def get_trajectory(self, action: np.ndarray) -> Tuple: def get_trajectory(self, action: np.ndarray) -> Tuple:
# duration = self.duration duration = self.duration
# duration = self.duration - self.current_traj_steps * self.dt
duration = action[0]
if self.learn_sub_trajectories: if self.learn_sub_trajectories:
duration = None duration = None
# reset with every new call as we need to set all arguments, such as tau, delay, again. # reset with every new call as we need to set all arguments, such as tau, delay, again.
@ -112,19 +97,11 @@ class BlackBoxWrapper(gym.ObservationWrapper):
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt) bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
# TODO we could think about initializing with the previous desired value in order to have a smooth transition # TODO we could think about initializing with the previous desired value in order to have a smooth transition
# at least from the planning point of view. # at least from the planning point of view.
# self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
if self.current_traj_steps == 0:
self.condition_pos = self.current_pos
self.condition_vel = self.current_vel
# bc_time = torch.as_tensor(bc_time, dtype=torch.float32) condition_pos = self.condition_pos if self.condition_pos is not None else self.current_pos
# self.condition_pos = torch.as_tensor(self.condition_pos, dtype=torch.float32) condition_vel = self.condition_vel if self.condition_vel is not None else self.current_vel
# self.condition_vel = torch.as_tensor(self.condition_vel, dtype=torch.float32)
print("bc_time", bc_time) self.traj_gen.set_boundary_conditions(bc_time, condition_pos, condition_vel)
print("self.condition_pos", self.condition_pos)
print("self.condition_vel", self.condition_vel)
self.traj_gen.set_boundary_conditions(bc_time, self.condition_pos, self.condition_vel)
# self.traj_gen.set_duration(duration, self.dt)
self.traj_gen.set_duration(duration, self.dt) self.traj_gen.set_duration(duration, self.dt)
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True) # traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
position = get_numpy(self.traj_gen.get_traj_pos()) position = get_numpy(self.traj_gen.get_traj_pos())
@ -167,18 +144,10 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step""" """ This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
# time_is_valid = self.env.check_time_validity(action)
#
# if time_valid:
# TODO remove this part, right now only needed for beer pong
# mp_params, env_spec_params, proceed = self.env.episode_callback(action, self.traj_gen)
position, velocity = self.get_trajectory(action) position, velocity = self.get_trajectory(action)
position, velocity = self.env.set_episode_arguments(action, position, velocity)
print("position", position) traj_is_valid, position, velocity = self.env.preprocessing_and_validity_callback(action, position, velocity)
print("velocity", velocity)
traj_is_valid = self.env.episode_callback(action, position, velocity)
trajectory_length = len(position) trajectory_length = len(position)
rewards = np.zeros(shape=(trajectory_length,)) rewards = np.zeros(shape=(trajectory_length,))
@ -190,14 +159,12 @@ class BlackBoxWrapper(gym.ObservationWrapper):
infos = dict() infos = dict()
done = False done = False
if self.verbose >= 2: if traj_is_valid is False:
desired_pos_traj = [] obs, trajectory_return, done, infos = self.env.invalid_traj_callback(action, position, velocity,
desired_vel_traj = [] self.return_context_observation)
pos_traj = [] return self.observation(obs), trajectory_return, done, infos
vel_traj = [] else:
self.plan_steps += 1
if traj_is_valid:
self.plan_counts += 1
for t, (pos, vel) in enumerate(zip(position, velocity)): for t, (pos, vel) in enumerate(zip(position, velocity)):
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel) step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high) c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
@ -213,24 +180,15 @@ class BlackBoxWrapper(gym.ObservationWrapper):
elems[t] = v elems[t] = v
infos[k] = elems infos[k] = elems
if self.verbose >= 2:
desired_pos_traj.append(pos)
desired_vel_traj.append(vel)
pos_traj.append(self.current_pos)
vel_traj.append(self.current_vel)
if self.render_kwargs: if self.render_kwargs:
self.env.render(**self.render_kwargs) self.env.render(**self.render_kwargs)
if done or (self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action, if done or (self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action,
t + 1 + self.current_traj_steps) t + 1 + self.current_traj_steps)
and self.max_planning_times is not None and self.plan_counts < self.max_planning_times): and self.plan_steps < self.max_planning_times):
# if self.max_planning_times is not None and self.plan_counts >= self.max_planning_times: self.condition_pos = pos if self.condition_on_desired else None
# continue self.condition_vel = vel if self.condition_on_desired else None
self.condition_pos = pos if self.desired_conditioning else self.current_pos
self.condition_vel = vel if self.desired_conditioning else self.current_vel
break break
@ -238,24 +196,15 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.current_traj_steps += t + 1 self.current_traj_steps += t + 1
if self.verbose >= 2: if self.verbose >= 2:
infos['desired_pos'] = position[:t+1] infos['positions'] = position
infos['desired_vel'] = velocity[:t+1] infos['velocities'] = velocity
infos['current_pos'] = self.current_pos
infos['current_vel'] = self.current_vel
infos['step_actions'] = actions[:t + 1] infos['step_actions'] = actions[:t + 1]
infos['step_observations'] = observations[:t + 1] infos['step_observations'] = observations[:t + 1]
infos['step_rewards'] = rewards[:t + 1] infos['step_rewards'] = rewards[:t + 1]
infos['desired_pos_traj'] = np.array(desired_pos_traj)
infos['desired_vel_traj'] = np.array(desired_vel_traj)
infos['pos_traj'] = np.array(pos_traj)
infos['vel_traj'] = np.array(vel_traj)
infos['trajectory_length'] = t + 1 infos['trajectory_length'] = t + 1
trajectory_return = self.reward_aggregation(rewards[:t + 1]) trajectory_return = self.reward_aggregation(rewards[:t + 1])
return self.observation(obs), trajectory_return, done, infos return self.observation(obs), trajectory_return, done, infos
else:
obs, trajectory_return, done, infos = self.env.invalid_traj_callback(action, position, velocity)
return self.observation(obs), trajectory_return, done, infos
def render(self, **kwargs): def render(self, **kwargs):
"""Only set render options here, such that they can be used during the rollout. """Only set render options here, such that they can be used during the rollout.
@ -264,7 +213,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
self.current_traj_steps = 0 self.current_traj_steps = 0
self.plan_counts = 0 self.plan_steps = 0
self.tau_first_prediction = None
self.traj_gen.reset() self.traj_gen.reset()
self.condition_vel = None
self.condition_pos = None
return super(BlackBoxWrapper, self).reset() return super(BlackBoxWrapper, self).reset()

View File

@ -52,6 +52,19 @@ class RawInterfaceWrapper(gym.Wrapper):
""" """
return self.env.dt return self.env.dt
def preprocessing_and_validity_callback(self, action: np.ndarray, pos_traj: np.ndarray, vel_traj: np.ndarray) \
-> Tuple[bool, np.ndarray, np.ndarray]:
"""
Used to preprocess the action and check if the desired trajectory is valid.
"""
return True, pos_traj, vel_traj
def set_episode_arguments(self, action, pos_traj, vel_traj):
"""
Used to set the arguments for env that valid for the whole episode
"""
return pos_traj, vel_traj
def episode_callback(self, action: np.ndarray, pos_traj: np.ndarray, vel_traj: np.array) -> Tuple[bool]: def episode_callback(self, action: np.ndarray, pos_traj: np.ndarray, vel_traj: np.array) -> Tuple[bool]:
""" """
Used to extract the parameters for the movement primitive and other parameters from an action array which might Used to extract the parameters for the movement primitive and other parameters from an action array which might
@ -68,7 +81,6 @@ class RawInterfaceWrapper(gym.Wrapper):
def invalid_traj_callback(self, action: np.ndarray, pos_traj: np.ndarray, vel_traj: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]: def invalid_traj_callback(self, action: np.ndarray, pos_traj: np.ndarray, vel_traj: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
""" """
Used to return a fake return from the environment if the desired trajectory is invalid. Used to return a artificial return from the env if the desired trajectory is invalid.
""" """
obs = np.zeros(1) return np.zeros(1), 0, True, {}
return obs, 0, True, {}

View File

@ -18,6 +18,8 @@ from .mujoco.reacher.reacher import ReacherEnv, MAX_EPISODE_STEPS_REACHER
from .mujoco.walker_2d_jump.walker_2d_jump import MAX_EPISODE_STEPS_WALKERJUMP from .mujoco.walker_2d_jump.walker_2d_jump import MAX_EPISODE_STEPS_WALKERJUMP
from .mujoco.box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, \ from .mujoco.box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, \
BoxPushingTemporalSpatialSparse, MAX_EPISODE_STEPS_BOX_PUSHING BoxPushingTemporalSpatialSparse, MAX_EPISODE_STEPS_BOX_PUSHING
from .mujoco.table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching, \
MAX_EPISODE_STEPS_TABLE_TENNIS
ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "ProDMP": []} ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS = {"DMP": [], "ProMP": [], "ProDMP": []}
@ -28,9 +30,7 @@ DEFAULT_BB_DICT_ProMP = {
'trajectory_generator_type': 'promp' 'trajectory_generator_type': 'promp'
}, },
"phase_generator_kwargs": { "phase_generator_kwargs": {
'phase_generator_type': 'linear', 'phase_generator_type': 'linear'
'learn_tau': False,
'learn_delay': False,
}, },
"controller_kwargs": { "controller_kwargs": {
'controller_type': 'motor', 'controller_type': 'motor',
@ -77,8 +77,6 @@ DEFAULT_BB_DICT_ProDMP = {
}, },
"phase_generator_kwargs": { "phase_generator_kwargs": {
'phase_generator_type': 'exp', 'phase_generator_type': 'exp',
'learn_delay': False,
'learn_tau': False,
}, },
"controller_kwargs": { "controller_kwargs": {
'controller_type': 'motor', 'controller_type': 'motor',
@ -91,9 +89,6 @@ DEFAULT_BB_DICT_ProDMP = {
'num_basis': 5, 'num_basis': 5,
}, },
"black_box_kwargs": { "black_box_kwargs": {
'replanning_schedule': None,
'max_planning_times': None,
'verbose': 2
} }
} }
@ -255,15 +250,28 @@ for ctxt_dim in [2, 4]:
register( register(
id='TableTennis{}D-v0'.format(ctxt_dim), id='TableTennis{}D-v0'.format(ctxt_dim),
entry_point='fancy_gym.envs.mujoco:TableTennisEnv', entry_point='fancy_gym.envs.mujoco:TableTennisEnv',
max_episode_steps=350, max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS,
kwargs={ kwargs={
"ctxt_dim": ctxt_dim, "ctxt_dim": ctxt_dim,
'frame_skip': 4, 'frame_skip': 4,
'enable_wind': False, 'goal_switching_step': None,
'enable_switching_goal': False, 'enable_artificial_wind': False,
} }
) )
register(
id='TableTennisWind-v0',
entry_point='fancy_gym.envs.mujoco:TableTennisWind',
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS,
)
register(
id='TableTennisGoalSwitching-v0',
entry_point='fancy_gym.envs.mujoco:TableTennisGoalSwitching',
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS,
)
# movement Primitive Environments # movement Primitive Environments
## Simple Reacher ## Simple Reacher
@ -510,24 +518,22 @@ for _v in _versions:
for _v in _versions: for _v in _versions:
_name = _v.split("-") _name = _v.split("-")
_env_id = f'{_name[0]}ProDMP-{_name[1]}' _env_id = f'{_name[0]}ReplanProDMP-{_name[1]}'
kwargs_dict_box_pushing_prodmp = deepcopy(DEFAULT_BB_DICT_ProDMP) kwargs_dict_box_pushing_prodmp = deepcopy(DEFAULT_BB_DICT_ProDMP)
kwargs_dict_box_pushing_prodmp['wrappers'].append(mujoco.box_pushing.MPWrapper) kwargs_dict_box_pushing_prodmp['wrappers'].append(mujoco.box_pushing.MPWrapper)
kwargs_dict_box_pushing_prodmp['name'] = _v kwargs_dict_box_pushing_prodmp['name'] = _v
kwargs_dict_box_pushing_prodmp['controller_kwargs']['p_gains'] = 0.01 * np.array([120., 120., 120., 120., 50., 30., 10.]) kwargs_dict_box_pushing_prodmp['controller_kwargs']['p_gains'] = 0.01 * np.array([120., 120., 120., 120., 50., 30., 10.])
kwargs_dict_box_pushing_prodmp['controller_kwargs']['d_gains'] = 0.01 * np.array([10., 10., 10., 10., 6., 5., 3.]) kwargs_dict_box_pushing_prodmp['controller_kwargs']['d_gains'] = 0.01 * np.array([10., 10., 10., 10., 6., 5., 3.])
# kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['weights_scale'] = np.array([3.4944e+01, 4.3734e+01, 9.6711e+01, 2.4429e+02, 5.8272e+02])
# kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_scale'] = 3.1264e-01
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['weights_scale'] = 0.3 kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['weights_scale'] = 0.3
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_scale'] = 0.3 kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_scale'] = 0.3
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['auto_scale_basis'] = True kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['auto_scale_basis'] = True
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_offset'] = 1.0 kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_offset'] = 1.0
kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['num_basis'] = 4 kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['num_basis'] = 4
kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['alpha'] = 10. kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['basis_bandwidth_factor'] = 3
kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['basis_bandwidth_factor'] = 3 # 3.5, 4 to try
kwargs_dict_box_pushing_prodmp['phase_generator_kwargs']['alpha_phase'] = 3 kwargs_dict_box_pushing_prodmp['phase_generator_kwargs']['alpha_phase'] = 3
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['max_planning_times'] = 4 kwargs_dict_box_pushing_prodmp['black_box_kwargs']['max_planning_times'] = 4
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 25 == 0 kwargs_dict_box_pushing_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 25 == 0
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['condition_on_desired'] = True
register( register(
id=_env_id, id=_env_id,
entry_point='fancy_gym.utils.make_env_helpers:make_bb_env_helper', entry_point='fancy_gym.utils.make_env_helpers:make_bb_env_helper',
@ -536,21 +542,25 @@ for _v in _versions:
ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS["ProDMP"].append(_env_id) ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS["ProDMP"].append(_env_id)
## Table Tennis ## Table Tennis
_versions = ['TableTennis2D-v0', 'TableTennis4D-v0'] _versions = ['TableTennis2D-v0', 'TableTennis4D-v0', 'TableTennisWind-v0', 'TableTennisGoalSwitching-v0']
for _v in _versions: for _v in _versions:
_name = _v.split("-") _name = _v.split("-")
_env_id = f'{_name[0]}ProMP-{_name[1]}' _env_id = f'{_name[0]}ProMP-{_name[1]}'
kwargs_dict_tt_promp = deepcopy(DEFAULT_BB_DICT_ProMP) kwargs_dict_tt_promp = deepcopy(DEFAULT_BB_DICT_ProMP)
kwargs_dict_tt_promp['wrappers'].append(mujoco.table_tennis.MPWrapper) if _v == 'TableTennisWind-v0':
kwargs_dict_tt_promp['wrappers'].append(mujoco.table_tennis.TTVelObs_MPWrapper)
else:
kwargs_dict_tt_promp['wrappers'].append(mujoco.table_tennis.TT_MPWrapper)
kwargs_dict_tt_promp['name'] = _v kwargs_dict_tt_promp['name'] = _v
kwargs_dict_tt_promp['controller_kwargs']['p_gains'] = 0.5 * np.array([1.0, 4.0, 2.0, 4.0, 1.0, 4.0, 1.0]) kwargs_dict_tt_promp['controller_kwargs']['p_gains'] = 0.5 * np.array([1.0, 4.0, 2.0, 4.0, 1.0, 4.0, 1.0])
kwargs_dict_tt_promp['controller_kwargs']['d_gains'] = 0.5 * np.array([0.1, 0.4, 0.2, 0.4, 0.1, 0.4, 0.1]) kwargs_dict_tt_promp['controller_kwargs']['d_gains'] = 0.5 * np.array([0.1, 0.4, 0.2, 0.4, 0.1, 0.4, 0.1])
kwargs_dict_tt_promp['phase_generator_kwargs']['learn_tau'] = True kwargs_dict_tt_promp['phase_generator_kwargs']['learn_tau'] = True
kwargs_dict_tt_promp['phase_generator_kwargs']['learn_delay'] = True kwargs_dict_tt_promp['phase_generator_kwargs']['learn_delay'] = True
kwargs_dict_tt_promp['phase_generator_kwargs']['tau_bound'] = [0.8, 1.5]
kwargs_dict_tt_promp['phase_generator_kwargs']['delay_bound'] = [0.05, 0.15]
kwargs_dict_tt_promp['basis_generator_kwargs']['num_basis'] = 3 kwargs_dict_tt_promp['basis_generator_kwargs']['num_basis'] = 3
kwargs_dict_tt_promp['basis_generator_kwargs']['num_basis_zero_start'] = 2 kwargs_dict_tt_promp['basis_generator_kwargs']['num_basis_zero_start'] = 2
kwargs_dict_tt_promp['basis_generator_kwargs']['num_basis_zero_goal'] = 1 kwargs_dict_tt_promp['basis_generator_kwargs']['num_basis_zero_goal'] = 1
kwargs_dict_tt_promp['black_box_kwargs']['duration'] = 2.
kwargs_dict_tt_promp['black_box_kwargs']['verbose'] = 2 kwargs_dict_tt_promp['black_box_kwargs']['verbose'] = 2
register( register(
id=_env_id, id=_env_id,
@ -561,22 +571,24 @@ for _v in _versions:
for _v in _versions: for _v in _versions:
_name = _v.split("-") _name = _v.split("-")
_env_id = f'{_name[0]}ProDMP-{_name[1]}' _env_id = f'{_name[0]}ReplanProDMP-{_name[1]}'
kwargs_dict_tt_prodmp = deepcopy(DEFAULT_BB_DICT_ProDMP) kwargs_dict_tt_prodmp = deepcopy(DEFAULT_BB_DICT_ProDMP)
kwargs_dict_tt_prodmp['wrappers'].append(mujoco.table_tennis.MPWrapper) if _v == 'TableTennisWind-v0':
kwargs_dict_tt_prodmp['wrappers'].append(mujoco.table_tennis.TTVelObs_MPWrapper)
else:
kwargs_dict_tt_prodmp['wrappers'].append(mujoco.table_tennis.TT_MPWrapper)
kwargs_dict_tt_prodmp['name'] = _v kwargs_dict_tt_prodmp['name'] = _v
kwargs_dict_tt_prodmp['controller_kwargs']['p_gains'] = 0.5 * np.array([1.0, 4.0, 2.0, 4.0, 1.0, 4.0, 1.0]) kwargs_dict_tt_prodmp['controller_kwargs']['p_gains'] = 0.5 * np.array([1.0, 4.0, 2.0, 4.0, 1.0, 4.0, 1.0])
kwargs_dict_tt_prodmp['controller_kwargs']['d_gains'] = 0.5 * np.array([0.1, 0.4, 0.2, 0.4, 0.1, 0.4, 0.1]) kwargs_dict_tt_prodmp['controller_kwargs']['d_gains'] = 0.5 * np.array([0.1, 0.4, 0.2, 0.4, 0.1, 0.4, 0.1])
kwargs_dict_tt_prodmp['trajectory_generator_kwargs']['weights_scale'] = 1.0 kwargs_dict_tt_prodmp['trajectory_generator_kwargs']['auto_scale_basis'] = False
kwargs_dict_tt_prodmp['trajectory_generator_kwargs']['goal_scale'] = 1.0
kwargs_dict_tt_prodmp['trajectory_generator_kwargs']['auto_scale_basis'] = True
kwargs_dict_tt_prodmp['trajectory_generator_kwargs']['goal_offset'] = 1.0 kwargs_dict_tt_prodmp['trajectory_generator_kwargs']['goal_offset'] = 1.0
kwargs_dict_tt_prodmp['phase_generator_kwargs']['tau_bound'] = [0.8, 1.5]
kwargs_dict_tt_prodmp['phase_generator_kwargs']['delay_bound'] = [0.05, 0.15]
kwargs_dict_tt_prodmp['phase_generator_kwargs']['learn_tau'] = True kwargs_dict_tt_prodmp['phase_generator_kwargs']['learn_tau'] = True
kwargs_dict_tt_prodmp['phase_generator_kwargs']['learn_delay'] = True kwargs_dict_tt_prodmp['phase_generator_kwargs']['learn_delay'] = True
kwargs_dict_tt_prodmp['basis_generator_kwargs']['num_basis'] = 2 kwargs_dict_tt_prodmp['basis_generator_kwargs']['num_basis'] = 2
kwargs_dict_tt_prodmp['basis_generator_kwargs']['alpha'] = 25. kwargs_dict_tt_prodmp['basis_generator_kwargs']['alpha'] = 25.
kwargs_dict_tt_prodmp['basis_generator_kwargs']['basis_bandwidth_factor'] = 3 # 3.5, 4 to try kwargs_dict_tt_prodmp['basis_generator_kwargs']['basis_bandwidth_factor'] = 3
kwargs_dict_tt_prodmp['basis_generator_kwargs']['pre_compute_length_factor'] = 5
kwargs_dict_tt_prodmp['phase_generator_kwargs']['alpha_phase'] = 3 kwargs_dict_tt_prodmp['phase_generator_kwargs']['alpha_phase'] = 3
kwargs_dict_tt_prodmp['black_box_kwargs']['max_planning_times'] = 3 kwargs_dict_tt_prodmp['black_box_kwargs']['max_planning_times'] = 3
kwargs_dict_tt_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 50 == 0 kwargs_dict_tt_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 50 == 0

View File

@ -8,4 +8,4 @@ from .hopper_throw.hopper_throw_in_basket import HopperThrowInBasketEnv
from .reacher.reacher import ReacherEnv from .reacher.reacher import ReacherEnv
from .walker_2d_jump.walker_2d_jump import Walker2dJumpEnv from .walker_2d_jump.walker_2d_jump import Walker2dJumpEnv
from .box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, BoxPushingTemporalSpatialSparse from .box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, BoxPushingTemporalSpatialSparse
from .table_tennis.table_tennis_env import TableTennisEnv from .table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching

View File

@ -71,9 +71,6 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
qpos = self.data.qpos[:7].copy() qpos = self.data.qpos[:7].copy()
qvel = self.data.qvel[:7].copy() qvel = self.data.qvel[:7].copy()
if (self._steps + 1) % 10 == 0:
self.calculateOfflineIK(np.array([0.4, 0.3, 0.14]), np.array([0, 1, 0, 0]))
if not unstable_simulation: if not unstable_simulation:
reward = self._get_reward(episode_end, box_pos, box_quat, target_pos, target_quat, reward = self._get_reward(episode_end, box_pos, box_quat, target_pos, target_quat,
rod_tip_pos, rod_quat, qpos, qvel, action) rod_tip_pos, rod_quat, qpos, qvel, action)
@ -96,9 +93,7 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
def reset_model(self): def reset_model(self):
# rest box to initial position # rest box to initial position
self.set_state(self.init_qpos_box_pushing, self.init_qvel_box_pushing) self.set_state(self.init_qpos_box_pushing, self.init_qvel_box_pushing)
random_init_pos = self.sample_context() box_init_pos = np.array([0.4, 0.3, -0.01, 0.0, 0.0, 0.0, 1.0])
# box_init_pos = np.array([0.4, 0.3, -0.01, 0.0, 0.0, 0.0, 1.0])
box_init_pos = random_init_pos
self.data.joint("box_joint").qpos = box_init_pos self.data.joint("box_joint").qpos = box_init_pos
# set target position # set target position
@ -224,10 +219,8 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
q_old = q q_old = q
q = q + dt * qd_d q = q + dt * qd_d
q = np.clip(q, q_min, q_max) q = np.clip(q, q_min, q_max)
self.data.qpos[:7] = q self.data.qpos[:7] = q
mujoco.mj_forward(self.model, self.data) mujoco.mj_forward(self.model, self.data)
current_cart_pos = self.data.body("tcp").xpos.copy() current_cart_pos = self.data.body("tcp").xpos.copy()
current_cart_quat = self.data.body("tcp").xquat.copy() current_cart_quat = self.data.body("tcp").xquat.copy()
@ -239,35 +232,27 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
err = np.hstack((cart_pos_error, cart_quat_error)) err = np.hstack((cart_pos_error, cart_quat_error))
err_norm = np.sum(cart_pos_error**2) + np.sum((current_cart_quat - desired_cart_quat)**2) err_norm = np.sum(cart_pos_error**2) + np.sum((current_cart_quat - desired_cart_quat)**2)
if err_norm > old_err_norm: if err_norm > old_err_norm:
# old_err_norm = err_norm
q = q_old q = q_old
dt = 0.7 * dt dt = 0.7 * dt
continue continue
else: else:
dt = 1.025 * dt dt = 1.025 * dt
i += 1
if err_norm < eps: if err_norm < eps:
print("IK converged in {} iterations".format(i))
break break
if i > IT_MAX: if i > IT_MAX:
print("IK did not converge in {} iterations".format(i))
break break
old_err_norm = err_norm old_err_norm = err_norm
### get Jacobian by mujoco ### get Jacobian by mujoco
self.data.qpos[:7] = q self.data.qpos[:7] = q
mujoco.mj_forward(self.model, self.data) mujoco.mj_forward(self.model, self.data)
jacp = self.get_body_jacp("tcp")[:, :7].copy() jacp = self.get_body_jacp("tcp")[:, :7].copy()
jacr = self.get_body_jacr("tcp")[:, :7].copy() jacr = self.get_body_jacr("tcp")[:, :7].copy()
J = np.concatenate((jacp, jacr), axis=0) J = np.concatenate((jacp, jacr), axis=0)
Jw = J.dot(w) Jw = J.dot(w)
@ -291,7 +276,7 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
qd_d = w.dot(J.transpose()).dot(qd_d) + qd_null qd_d = w.dot(J.transpose()).dot(qd_d) + qd_null
# i += 1 i += 1
return q return q
@ -375,14 +360,3 @@ class BoxPushingTemporalSpatialSparse(BoxPushingEnvBase):
reward += box_goal_pos_dist_reward + box_goal_rot_dist_reward reward += box_goal_pos_dist_reward + box_goal_rot_dist_reward
return reward return reward
if __name__=="__main__":
env = BoxPushingTemporalSpatialSparse(frame_skip=10)
env.reset()
for i in range(100):
env.reset()
for _ in range(100):
env.render("human")
action = env.action_space.sample()
obs, reward, done, info = env.step(action)
# print("info: {}".format(info))

View File

@ -1 +1 @@
from .mp_wrapper import MPWrapper from .mp_wrapper import TT_MPWrapper, TTVelObs_MPWrapper

View File

@ -6,7 +6,7 @@ from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, delay_bound, tau_bound from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, delay_bound, tau_bound
class MPWrapper(RawInterfaceWrapper): class TT_MPWrapper(RawInterfaceWrapper):
# Random x goal + random init pos # Random x goal + random init pos
@property @property
@ -16,7 +16,7 @@ class MPWrapper(RawInterfaceWrapper):
[False] * 7, # joints velocity [False] * 7, # joints velocity
[True] * 2, # position ball x, y [True] * 2, # position ball x, y
[False] * 1, # position ball z [False] * 1, # position ball z
# [False] * 3, # velocity ball x, y, z #[True] * 3, # velocity ball x, y, z
[True] * 2, # target landing position [True] * 2, # target landing position
# [True] * 1, # time # [True] * 1, # time
]) ])
@ -29,46 +29,26 @@ class MPWrapper(RawInterfaceWrapper):
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.data.qvel[:7].copy() return self.data.qvel[:7].copy()
# def check_time_validity(self, action): def preprocessing_and_validity_callback(self, action, pos_traj, vel_traj):
# return action[0] <= tau_bound[1] and action[0] >= tau_bound[0] \ return self.check_traj_validity(action, pos_traj, vel_traj)
# and action[1] <= delay_bound[1] and action[1] >= delay_bound[0]
#
# def time_invalid_traj_callback(self, action, pos_traj, vel_traj) \
# -> Tuple[np.ndarray, float, bool, dict]:
# tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]]))
# delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]]))
# invalid_penalty = tau_invalid_penalty + delay_invalid_penalty
# obs = np.concatenate([self.get_obs(), np.array([0])])
# return obs, -invalid_penalty, True, {
# "hit_ball": [False],
# "ball_returned_success": [False],
# "land_dist_error": [10.],
# "is_success": [False],
# 'trajectory_length': 1,
# "num_steps": [1]
# }
def episode_callback(self, action, pos_traj, vel_traj): def set_episode_arguments(self, action, pos_traj, vel_traj):
time_invalid = action[0] > tau_bound[1] or action[0] < tau_bound[0] \ return pos_traj, vel_traj
or action[1] > delay_bound[1] or action[1] < delay_bound[0]
if time_invalid or np.any(pos_traj > jnt_pos_high) or np.any(pos_traj < jnt_pos_low):
return False
return True
def invalid_traj_callback(self, action, pos_traj: np.ndarray, vel_traj: np.ndarray) \ def invalid_traj_callback(self, action: np.ndarray, pos_traj: np.ndarray, vel_traj: np.ndarray,
-> Tuple[np.ndarray, float, bool, dict]: return_contextual_obs: bool) -> Tuple[np.ndarray, float, bool, dict]:
tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]])) return self.get_invalid_traj_step_return(action, pos_traj, return_contextual_obs)
delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]]))
violate_high_bound_error = np.mean(np.maximum(pos_traj - jnt_pos_high, 0)) class TTVelObs_MPWrapper(TT_MPWrapper):
violate_low_bound_error = np.mean(np.maximum(jnt_pos_low - pos_traj, 0))
invalid_penalty = tau_invalid_penalty + delay_invalid_penalty + \ @property
violate_high_bound_error + violate_low_bound_error def context_mask(self):
obs = np.concatenate([self.get_obs(), np.array([0])]) return np.hstack([
return obs, -invalid_penalty, True, { [False] * 7, # joints position
"hit_ball": [False], [False] * 7, # joints velocity
"ball_returned_success": [False], [True] * 2, # position ball x, y
"land_dist_error": [10.], [False] * 1, # position ball z
"is_success": [False], [True] * 3, # velocity ball x, y, z
'trajectory_length': 1, [True] * 2, # target landing position
"num_steps": [1] # [True] * 1, # time
} ])

View File

@ -5,10 +5,11 @@ from gym import utils, spaces
from gym.envs.mujoco import MujocoEnv from gym.envs.mujoco import MujocoEnv
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import check_init_state_validity, magnus_force from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import check_init_state_validity, magnus_force
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, delay_bound, tau_bound
import mujoco import mujoco
MAX_EPISODE_STEPS_TABLE_TENNIS = 250 MAX_EPISODE_STEPS_TABLE_TENNIS = 350
CONTEXT_BOUNDS_2DIMS = np.array([[-1.0, -0.65], [-0.2, 0.65]]) CONTEXT_BOUNDS_2DIMS = np.array([[-1.0, -0.65], [-0.2, 0.65]])
CONTEXT_BOUNDS_4DIMS = np.array([[-1.0, -0.65, -1.0, -0.65], CONTEXT_BOUNDS_4DIMS = np.array([[-1.0, -0.65, -1.0, -0.65],
@ -21,11 +22,9 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
""" """
7 DoF table tennis environment 7 DoF table tennis environment
""" """
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
enable_switching_goal: bool = False, goal_switching_step: int = None,
enable_wind: bool = False, enable_magnus: bool = False, enable_artificial_wind: bool = False):
enable_air: bool = False):
utils.EzPickle.__init__(**locals()) utils.EzPickle.__init__(**locals())
self._steps = 0 self._steps = 0
@ -45,8 +44,11 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self._ball_traj = [] self._ball_traj = []
self._racket_traj = [] self._racket_traj = []
self._goal_switching_step = goal_switching_step
self._enable_goal_switching = enable_switching_goal self._enable_artificial_wind = enable_artificial_wind
self._artificial_force = 0.
MujocoEnv.__init__(self, MujocoEnv.__init__(self,
model_path=os.path.join(os.path.dirname(__file__), "assets", "xml", "table_tennis_env.xml"), model_path=os.path.join(os.path.dirname(__file__), "assets", "xml", "table_tennis_env.xml"),
@ -56,20 +58,13 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self.context_bounds = CONTEXT_BOUNDS_2DIMS self.context_bounds = CONTEXT_BOUNDS_2DIMS
elif ctxt_dim == 4: elif ctxt_dim == 4:
self.context_bounds = CONTEXT_BOUNDS_4DIMS self.context_bounds = CONTEXT_BOUNDS_4DIMS
if self._enable_goal_switching: if self._goal_switching_step is not None:
self.context_bounds = CONTEXT_BOUNDS_SWICHING self.context_bounds = CONTEXT_BOUNDS_SWICHING
else: else:
raise NotImplementedError raise NotImplementedError
self.action_space = spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32) self.action_space = spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32)
# complex dynamics settings
if enable_air:
self.model.opt.density = 1.225
self.model.opt.viscosity = 2.27e-5
self._enable_wind = enable_wind
self._enable_magnus = enable_magnus
self._wind_vel = np.zeros(3) self._wind_vel = np.zeros(3)
def _set_ids(self): def _set_ids(self):
@ -86,13 +81,16 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
unstable_simulation = False unstable_simulation = False
if self._enable_goal_switching: if self._steps == self._goal_switching_step and self.np_random.uniform(0, 1) < 0.5:
if self._steps == 45 and self.np_random.uniform(0, 1) < 0.5: new_goal_pos = self._generate_goal_pos(random=True)
self._goal_pos[1] = -self._goal_pos[1] new_goal_pos[1] = -new_goal_pos[1]
self._goal_pos = new_goal_pos
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]]) self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
mujoco.mj_forward(self.model, self.data) mujoco.mj_forward(self.model, self.data)
for _ in range(self.frame_skip): for _ in range(self.frame_skip):
if self._enable_artificial_wind:
self.data.qfrc_applied[-2] = self._artificial_force
try: try:
self.do_simulation(action, 1) self.do_simulation(action, 1)
except Exception as e: except Exception as e:
@ -160,16 +158,16 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self.data.joint("tar_y").qvel = self._init_ball_state[4] self.data.joint("tar_y").qvel = self._init_ball_state[4]
self.data.joint("tar_z").qvel = self._init_ball_state[5] self.data.joint("tar_z").qvel = self._init_ball_state[5]
if self._enable_artificial_wind:
self._artificial_force = self.np_random.uniform(low=-0.1, high=0.1)
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]]) self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
self.data.qpos[:7] = np.array([0., 0., 0., 1.5, 0., 0., 1.5]) self.data.qpos[:7] = np.array([0., 0., 0., 1.5, 0., 0., 1.5])
self.data.qvel[:7] = np.zeros(7)
mujoco.mj_forward(self.model, self.data) mujoco.mj_forward(self.model, self.data)
if self._enable_wind:
self._wind_vel[1] = self.np_random.uniform(low=-5, high=5, size=1)
self.model.opt.wind[:3] = self._wind_vel
self._hit_ball = False self._hit_ball = False
self._ball_land_on_table = False self._ball_land_on_table = False
self._ball_contact_after_hit = False self._ball_contact_after_hit = False
@ -193,10 +191,6 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self.data.joint("tar_x").qpos.copy(), self.data.joint("tar_x").qpos.copy(),
self.data.joint("tar_y").qpos.copy(), self.data.joint("tar_y").qpos.copy(),
self.data.joint("tar_z").qpos.copy(), self.data.joint("tar_z").qpos.copy(),
# self.data.joint("tar_x").qvel.copy(),
# self.data.joint("tar_y").qvel.copy(),
# self.data.joint("tar_z").qvel.copy(),
# self.data.body("target_ball").xvel.copy(),
self._goal_pos.copy(), self._goal_pos.copy(),
]) ])
return obs return obs
@ -237,51 +231,55 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
init_ball_state = self._generate_random_ball(random_pos=random_pos, random_vel=random_vel) init_ball_state = self._generate_random_ball(random_pos=random_pos, random_vel=random_vel)
return init_ball_state return init_ball_state
def plot_ball_traj(x_traj, y_traj, z_traj): def _get_traj_invalid_penalty(self, action, pos_traj):
import matplotlib.pyplot as plt tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]]))
fig = plt.figure() delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]]))
ax = fig.add_subplot(111, projection='3d') violate_high_bound_error = np.mean(np.maximum(pos_traj - jnt_pos_high, 0))
ax.plot(x_traj, y_traj, z_traj) violate_low_bound_error = np.mean(np.maximum(jnt_pos_low - pos_traj, 0))
plt.show() invalid_penalty = tau_invalid_penalty + delay_invalid_penalty + \
violate_high_bound_error + violate_low_bound_error
return -invalid_penalty
def plot_ball_traj_2d(x_traj, y_traj): def get_invalid_traj_step_return(self, action, pos_traj, contextual_obs):
import matplotlib.pyplot as plt obs = self._get_obs() if contextual_obs else np.concatenate([self._get_obs(), np.array([0])]) # 0 for invalid traj
fig = plt.figure() penalty = self._get_traj_invalid_penalty(action, pos_traj)
ax = fig.add_subplot(111) return obs, penalty, True, {
ax.plot(x_traj, y_traj) "hit_ball": [False],
plt.show() "ball_returned_success": [False],
"land_dist_error": [10.],
"is_success": [False],
"trajectory_length": 1,
"num_steps": [1],
}
def plot_single_axis(traj, title): @staticmethod
import matplotlib.pyplot as plt def check_traj_validity(action, pos_traj, vel_traj):
fig = plt.figure() time_invalid = action[0] > tau_bound[1] or action[0] < tau_bound[0] \
ax = fig.add_subplot(111) or action[1] > delay_bound[1] or action[1] < delay_bound[0]
ax.plot(traj) if time_invalid or np.any(pos_traj > jnt_pos_high) or np.any(pos_traj < jnt_pos_low):
ax.set_title(title) return False, pos_traj, vel_traj
plt.show() return True, pos_traj, vel_traj
if __name__ == "__main__":
env = TableTennisEnv(enable_air=True) class TableTennisWind(TableTennisEnv):
# env_with_air = TableTennisEnv(enable_air=True) def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4):
for _ in range(1): super().__init__(ctxt_dim=ctxt_dim, frame_skip=frame_skip, enable_artificial_wind=True)
obs1 = env.reset()
# obs2 = env_with_air.reset() def _get_obs(self):
x_pos = [] obs = np.concatenate([
y_pos = [] self.data.qpos.flat[:7].copy(),
z_pos = [] self.data.qvel.flat[:7].copy(),
x_vel = [] self.data.joint("tar_x").qpos.copy(),
y_vel = [] self.data.joint("tar_y").qpos.copy(),
z_vel = [] self.data.joint("tar_z").qpos.copy(),
for _ in range(2000): self.data.joint("tar_x").qvel.copy(),
obs, reward, done, info = env.step(np.zeros(7)) self.data.joint("tar_y").qvel.copy(),
# _, _, _, _ = env_no_air.step(np.zeros(7)) self.data.joint("tar_z").qvel.copy(),
x_pos.append(env.data.joint("tar_x").qpos[0]) self._goal_pos.copy(),
y_pos.append(env.data.joint("tar_y").qpos[0]) ])
z_pos.append(env.data.joint("tar_z").qpos[0]) return obs
x_vel.append(env.data.joint("tar_x").qvel[0])
y_vel.append(env.data.joint("tar_y").qvel[0])
z_vel.append(env.data.joint("tar_z").qvel[0]) class TableTennisGoalSwitching(TableTennisEnv):
# print(reward) def __init__(self, frame_skip: int = 4, goal_switching_step: int = 99):
if done: super().__init__(frame_skip=frame_skip, goal_switching_step=goal_switching_step)
# plot_ball_traj_2d(x_pos, y_pos)
plot_single_axis(x_pos, title="x_vel without air")
break

View File

@ -1,38 +1,62 @@
import fancy_gym import fancy_gym
import numpy as np
import matplotlib.pyplot as plt
def plot_trajectory(traj): def example_run_replanning_env(env_name="BoxPushingDenseReplanProDMP-v0", seed=1, iterations=1, render=False):
plt.figure()
plt.plot(traj[:, 3])
plt.legend()
plt.show()
def run_replanning_envs(env_name="BoxPushingProDMP-v0", seed=1, iterations=1, render=True):
env = fancy_gym.make(env_name, seed=seed) env = fancy_gym.make(env_name, seed=seed)
env.reset() env.reset()
for i in range(iterations): for i in range(iterations):
done = False done = False
desired_pos_traj = np.zeros((100, 7))
desired_vel_traj = np.zeros((100, 7))
real_pos_traj = np.zeros((100, 7))
real_vel_traj = np.zeros((100, 7))
t = 0
while done is False: while done is False:
ac = env.action_space.sample() ac = env.action_space.sample()
obs, reward, done, info = env.step(ac) obs, reward, done, info = env.step(ac)
desired_pos_traj[t: t + 25, :] = info['desired_pos']
desired_vel_traj[t: t + 25, :] = info['desired_vel']
# real_pos_traj.append(info['current_pos'])
# real_vel_traj.append(info['current_vel'])
t += 25
if render: if render:
env.render(mode="human") env.render(mode="human")
if done: if done:
env.reset() env.reset()
plot_trajectory(desired_pos_traj)
env.close() env.close()
del env del env
def example_custom_replanning_envs(seed=0, iteration=100, render=True):
# id for a step-based environment
base_env_id = "BoxPushingDense-v0"
wrappers = [fancy_gym.envs.mujoco.box_pushing.mp_wrapper.MPWrapper]
trajectory_generator_kwargs = {'trajectory_generator_type': 'prodmp',
'weight_scale': 1}
phase_generator_kwargs = {'phase_generator_type': 'exp'}
controller_kwargs = {'controller_type': 'velocity'}
basis_generator_kwargs = {'basis_generator_type': 'prodmp',
'num_basis': 5}
# max_planning_times: the maximum number of plans can be generated
# replanning_schedule: the trigger for replanning
# condition_on_desired: use desired state as the boundary condition for the next plan
black_box_kwargs = {'max_planning_times': 4,
'replanning_schedule': lambda pos, vel, obs, action, t: t % 25 == 0,
'condition_on_desired': True}
env = fancy_gym.make_bb(env_id=base_env_id, wrappers=wrappers, black_box_kwargs=black_box_kwargs,
traj_gen_kwargs=trajectory_generator_kwargs, controller_kwargs=controller_kwargs,
phase_kwargs=phase_generator_kwargs, basis_kwargs=basis_generator_kwargs,
seed=seed)
if render:
env.render(mode="human")
obs = env.reset()
for i in range(iteration):
ac = env.action_space.sample()
obs, reward, done, info = env.step(ac)
if done:
env.reset()
env.close()
del env
if __name__ == "__main__": if __name__ == "__main__":
run_replanning_envs(env_name="BoxPushingDenseProDMP-v0", seed=1, iterations=1, render=False) # run a registered replanning environment
example_run_replanning_env(env_name="BoxPushingDenseReplanProDMP-v0", seed=1, iterations=1, render=False)
# run a custom replanning environment
example_custom_replanning_envs(seed=0, iteration=8, render=True)

View File

@ -17,8 +17,6 @@ def example_mp(env_name="HoleReacherProMP-v0", seed=1, iterations=1, render=True
# It takes care of seeding and enables the use of a variety of external environments using the gym interface. # It takes care of seeding and enables the use of a variety of external environments using the gym interface.
env = fancy_gym.make(env_name, seed) env = fancy_gym.make(env_name, seed)
# env.traj_gen.basis_gn.show_basis(plot=True)
returns = 0 returns = 0
# env.render(mode=None) # env.render(mode=None)
obs = env.reset() obs = env.reset()
@ -40,22 +38,16 @@ def example_mp(env_name="HoleReacherProMP-v0", seed=1, iterations=1, render=True
# Now the action space is not the raw action but the parametrization of the trajectory generator, # Now the action space is not the raw action but the parametrization of the trajectory generator,
# such as a ProMP # such as a ProMP
ac = env.action_space.sample() ac = env.action_space.sample()
# ac[0] = 0.6866657733917236
# ac[1] = 0.08587364107370377
# This executes a full trajectory and gives back the context (obs) of the last step in the trajectory, or the # This executes a full trajectory and gives back the context (obs) of the last step in the trajectory, or the
# full observation space of the last step, if replanning/sub-trajectory learning is used. The 'reward' is equal # full observation space of the last step, if replanning/sub-trajectory learning is used. The 'reward' is equal
# to the return of a trajectory. Default is the sum over the step-wise rewards. # to the return of a trajectory. Default is the sum over the step-wise rewards.
print(f'target obs: {obs[-3:]}')
obs, reward, done, info = env.step(ac) obs, reward, done, info = env.step(ac)
print(f'steps: {info["num_steps"][-1]}')
# Aggregated returns # Aggregated returns
returns += reward returns += reward
if done: if done:
# print(reward) print(reward)
obs = env.reset() obs = env.reset()
print("=================New Episode======================")
# print("steps: {}".format(info["num_steps"][-1]))
def example_custom_mp(env_name="Reacher5dProMP-v0", seed=1, iterations=1, render=True): def example_custom_mp(env_name="Reacher5dProMP-v0", seed=1, iterations=1, render=True):
@ -163,21 +155,22 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
if __name__ == '__main__': if __name__ == '__main__':
render = True render = False
# DMP # DMP
# example_mp("HoleReacherDMP-v0", seed=10, iterations=5, render=render) example_mp("HoleReacherDMP-v0", seed=10, iterations=5, render=render)
# ProMP # ProMP
# example_mp("HoleReacherProMP-v0", seed=10, iterations=5, render=render) example_mp("HoleReacherProMP-v0", seed=10, iterations=5, render=render)
# example_mp("BoxPushingTemporalSparseProMP-v0", seed=10, iterations=1, render=render) example_mp("BoxPushingTemporalSparseProMP-v0", seed=10, iterations=1, render=render)
# example_mp("TableTennis4DProMP-v0", seed=10, iterations=10, render=True) example_mp("TableTennis4DProMP-v0", seed=10, iterations=20, render=render)
# ProDMP # ProDMP with Replanning
# example_mp("BoxPushingDenseProDMP-v0", seed=10, iterations=16, render=render) example_mp("BoxPushingDenseReplanProDMP-v0", seed=10, iterations=4, render=render)
example_mp("TableTennis4DProDMP-v0", seed=10, iterations=5000, render=render) example_mp("TableTennis4DReplanProDMP-v0", seed=10, iterations=20, render=render)
example_mp("TableTennisWindReplanProDMP-v0", seed=10, iterations=20, render=render)
# Altered basis functions # Altered basis functions
# obs1 = example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=1, render=render) obs1 = example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=1, render=render)
# Custom MP # Custom MP
# example_fully_custom_mp(seed=10, iterations=1, render=render) example_fully_custom_mp(seed=10, iterations=1, render=render)

View File

@ -168,11 +168,11 @@ def make_bb(
# set tau bounds to minimum of two env steps otherwise computing the velocity is not possible. # set tau bounds to minimum of two env steps otherwise computing the velocity is not possible.
# maximum is full duration of one episode. # maximum is full duration of one episode.
if phase_kwargs.get('learn_tau'): if phase_kwargs.get('learn_tau') and phase_kwargs.get('tau_bound') is None:
phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']] phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']]
# Max delay is full duration minus two steps due to above reason # Max delay is full duration minus two steps due to above reason
if phase_kwargs.get('learn_delay'): if phase_kwargs.get('learn_delay') and phase_kwargs.get('delay_bound') is None:
phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2] phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2]
phase_gen = get_phase_generator(**phase_kwargs) phase_gen = get_phase_generator(**phase_kwargs)

View File

@ -34,7 +34,7 @@ setup(
], ],
extras_require=extras, extras_require=extras,
install_requires=[ install_requires=[
'gym[mujoco]<0.25.0,>=0.24.0', 'gym[mujoco]<0.25.0,>=0.24.1',
'mp_pytorch @ git+https://github.com/ALRhub/MP_PyTorch.git@main' 'mp_pytorch @ git+https://github.com/ALRhub/MP_PyTorch.git@main'
], ],
packages=[package for package in find_packages() if package.startswith("fancy_gym")], packages=[package for package in find_packages() if package.startswith("fancy_gym")],

View File

@ -67,28 +67,32 @@ def test_missing_wrapper(env_id: str):
fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {})
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
def test_missing_local_state(mp_type: str): def test_missing_local_state(mp_type: str):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {}, env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
env.step(env.action_space.sample()) env.step(env.action_space.sample())
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('verbose', [1, 2]) @pytest.mark.parametrize('verbose', [1, 2])
def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose}, env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
info_keys = list(env.step(env.action_space.sample())[3].keys()) info_keys = list(env.step(env.action_space.sample())[3].keys())
@ -104,15 +108,17 @@ def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]
assert all(e in info_keys for e in mp_keys) assert all(e in info_keys for e in mp_keys)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {}, env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
for _ in range(5): for _ in range(5):
env.reset() env.reset()
@ -121,14 +127,15 @@ def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
assert length == env.spec.max_episode_steps assert length == env.spec.max_episode_steps
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) @pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])])
def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]): def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
# ToyEnv only returns 1 as reward # ToyEnv only returns 1 as reward
assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, ))
@ -149,12 +156,13 @@ def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapp
assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('num_dof', [0, 1, 2, 5]) @pytest.mark.parametrize('num_dof', [0, 1, 2, 5])
@pytest.mark.parametrize('num_basis', [0, 1, 2, 5]) @pytest.mark.parametrize('num_basis', [0, 1, 2, 5])
@pytest.mark.parametrize('learn_tau', [True, False]) @pytest.mark.parametrize('learn_tau', [True, False])
@pytest.mark.parametrize('learn_delay', [True, False]) @pytest.mark.parametrize('learn_delay', [True, False])
def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
'action_dim': num_dof 'action_dim': num_dof
@ -164,28 +172,29 @@ def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: boo
'learn_tau': learn_tau, 'learn_tau': learn_tau,
'learn_delay': learn_delay 'learn_delay': learn_delay
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
'num_basis': num_basis 'num_basis': num_basis
}) })
base_dims = num_dof * num_basis base_dims = num_dof * num_basis
additional_dims = num_dof if mp_type == 'dmp' else 0 additional_dims = num_dof if 'dmp' in mp_type else 0
traj_modification_dims = int(learn_tau) + int(learn_delay) traj_modification_dims = int(learn_tau) + int(learn_delay)
assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('a', [1]) @pytest.mark.parametrize('a', [1])
@pytest.mark.parametrize('b', [1.0]) @pytest.mark.parametrize('b', [1.0])
@pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]]) @pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]])
@pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}]) @pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}])
@pytest.mark.parametrize('e', [Object()]) @pytest.mark.parametrize('e', [Object()])
def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object): def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, {'basis_generator_type': basis_generator_type},
a=a, b=b, c=c, d=d, e=e a=a, b=b, c=c, d=d, e=e
) )
assert a is env.a assert a is env.a
@ -196,18 +205,20 @@ def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e:
assert e is env.e assert e is env.e
@pytest.mark.parametrize('mp_type', ['promp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
def test_learn_tau(mp_type: str, tau: float): def test_learn_tau(mp_type: str, tau: float):
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': phase_generator_type,
'learn_tau': True, 'learn_tau': True,
'learn_delay': False 'learn_delay': False
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
}, seed=SEED) }, seed=SEED)
d = True d = True
@ -228,26 +239,29 @@ def test_learn_tau(mp_type: str, tau: float):
vel = info['velocities'].flatten() vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis) # Check end is all same (only true for linear basis)
if phase_generator_type == "linear":
assert np.all(pos[tau_time_steps:] == pos[-1]) assert np.all(pos[tau_time_steps:] == pos[-1])
assert np.all(vel[tau_time_steps:] == vel[-1]) assert np.all(vel[tau_time_steps:] == vel[-1])
# Check active trajectory section is different to end values # Check active trajectory section is different to end values
assert np.all(pos[:tau_time_steps - 1] != pos[-1]) assert np.all(pos[:tau_time_steps - 1] != pos[-1])
assert np.all(vel[:tau_time_steps - 2] != vel[-1]) assert np.all(vel[:tau_time_steps - 2] != vel[-1])
#
#
@pytest.mark.parametrize('mp_type', ['promp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) @pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
def test_learn_delay(mp_type: str, delay: float): def test_learn_delay(mp_type: str, delay: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': phase_generator_type,
'learn_tau': False, 'learn_tau': False,
'learn_delay': True 'learn_delay': True
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
}, seed=SEED) }, seed=SEED)
d = True d = True
@ -274,21 +288,23 @@ def test_learn_delay(mp_type: str, delay: float):
# Check active trajectory section is different to beginning values # Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0]) assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0]) assert np.all(vel[max(1, delay_time_steps)] != vel[0])
#
#
@pytest.mark.parametrize('mp_type', ['promp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': phase_generator_type,
'learn_tau': True, 'learn_tau': True,
'learn_delay': True 'learn_delay': True
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
}, seed=SEED) }, seed=SEED)
if env.spec.max_episode_steps * env.dt < delay + tau: if env.spec.max_episode_steps * env.dt < delay + tau:
@ -315,6 +331,7 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
vel = info['velocities'].flatten() vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis) # Check end is all same (only true for linear basis)
if phase_generator_type == "linear":
assert np.all(pos[joint_time_steps:] == pos[-1]) assert np.all(pos[joint_time_steps:] == pos[-1])
assert np.all(vel[joint_time_steps:] == vel[-1]) assert np.all(vel[joint_time_steps:] == vel[-1])

View File

@ -1,23 +0,0 @@
from itertools import chain
import pytest
import fancy_gym
from test.utils import run_env, run_env_determinism
Fancy_ProDMP_IDS = fancy_gym.ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS['ProDMP']
All_ProDMP_IDS = fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS['ProDMP']
@pytest.mark.parametrize('env_id', All_ProDMP_IDS)
def test_replanning_envs(env_id: str):
"""Tests that ProDMP environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', All_ProDMP_IDS)
def test_replanning_determinism(env_id: str):
"""Tests that ProDMP environments are deterministic."""
run_env_determinism(env_id, 0)

View File

@ -98,7 +98,7 @@ def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInter
assert length <= np.round(env.traj_gen.tau.numpy() / env.dt) assert length <= np.round(env.traj_gen.tau.numpy() / env.dt)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) @pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
@pytest.mark.parametrize('replanning_time', [10, 100, 1000]) @pytest.mark.parametrize('replanning_time', [10, 100, 1000])
@ -114,11 +114,14 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra
replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear'
env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2}, env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': phase_generator_type},
{'basis_generator_type': 'rbf'}, seed=SEED) {'basis_generator_type': basis_generator_type}, seed=SEED)
assert env.do_replanning assert env.do_replanning
assert callable(env.replanning_schedule) assert callable(env.replanning_schedule)
@ -142,3 +145,189 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra
env.reset() env.reset()
assert replanning_schedule(None, None, None, None, length) assert replanning_schedule(None, None, None, None, length)
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'max_planning_times': max_planning_times,
'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
_, _, d, _ = env.step(env.action_space.sample())
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
@pytest.mark.parametrize('tau', [0.5, 1.0, 1.5, 2.0])
def test_replanning_with_learn_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int, tau: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': True,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
action = env.action_space.sample()
action[0] = tau
_, _, d, info = env.step(action)
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
@pytest.mark.parametrize('delay', [0.1, 0.25, 0.5, 0.75])
def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_segment_steps: int, delay: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': True
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
action = env.action_space.sample()
action[0] = delay
_, _, d, info = env.step(action)
delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check beginning is all same (only true for linear basis)
if planning_times == 0:
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
# only valid when delay < sub_segment_steps
elif planning_times > 0 and delay_time_steps < sub_segment_steps:
assert np.all(pos[1:max(1, delay_time_steps - 1)] != pos[0])
assert np.all(vel[1:max(1, delay_time_steps - 2)] != vel[0])
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3])
@pytest.mark.parametrize('sub_segment_steps', [5, 10, 15])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
@pytest.mark.parametrize('tau', [0.5, 0.75, 1.0])
def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int,
delay: float, tau: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': True,
'learn_delay': True
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
action = env.action_space.sample()
action[0] = tau
action[1] = delay
_, _, d, info = env.step(action)
delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Delay only applies to first planning time
if planning_times == 0:
# Check delay is applied
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_steps: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'max_planning_times': max_planning_times,
'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
for i in range(max_planning_times):
_, _, d, _ = env.step(env.action_space.sample())
assert d