Merge branch 'master' into tt_cluster_debug

# Conflicts:
#	fancy_gym/black_box/black_box_wrapper.py
#	fancy_gym/envs/__init__.py
#	fancy_gym/examples/examples_movement_primitives.py
This commit is contained in:
Hongyi Zhou 2022-11-25 17:16:26 +01:00
commit c242c32a41
9 changed files with 326 additions and 174 deletions

View File

@ -2,7 +2,6 @@ from typing import Tuple, Optional, Callable
import gym import gym
import numpy as np import numpy as np
import torch
from gym import spaces from gym import spaces
from mp_pytorch.mp.mp_interfaces import MPInterface from mp_pytorch.mp.mp_interfaces import MPInterface
@ -23,8 +22,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
replanning_schedule: Optional[ replanning_schedule: Optional[
Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None, Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None,
reward_aggregation: Callable[[np.ndarray], float] = np.sum, reward_aggregation: Callable[[np.ndarray], float] = np.sum,
max_planning_times = None, max_planning_times: int = None,
desired_conditioning: bool = False condition_on_desired: bool = False
): ):
""" """
gym.Wrapper for leveraging a black box approach with a trajectory generator. gym.Wrapper for leveraging a black box approach with a trajectory generator.
@ -59,10 +58,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# reward computation # reward computation
self.reward_aggregation = reward_aggregation self.reward_aggregation = reward_aggregation
# self.traj_gen.basis_gn.show_basis(plot=True)
# spaces # spaces
self.return_context_observation = not (learn_sub_trajectories or self.do_replanning) self.return_context_observation = not (learn_sub_trajectories or self.do_replanning)
# self.return_context_observation = True
self.traj_gen_action_space = self._get_traj_gen_action_space() self.traj_gen_action_space = self._get_traj_gen_action_space()
self.action_space = self._get_action_space() self.action_space = self._get_action_space()
@ -81,14 +78,12 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.verbose = verbose self.verbose = verbose
# condition value # condition value
self.desired_conditioning = False self.condition_on_desired = condition_on_desired
self.condition_pos = None self.condition_pos = None
self.condition_vel = None self.condition_vel = None
self.max_planning_times = max_planning_times self.max_planning_times = max_planning_times
self.plan_counts = 0 self.plan_steps = 0
self.tau_first_prediction = None
def observation(self, observation): def observation(self, observation):
# return context space if we are # return context space if we are
@ -111,16 +106,11 @@ class BlackBoxWrapper(gym.ObservationWrapper):
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt) bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
# TODO we could think about initializing with the previous desired value in order to have a smooth transition # TODO we could think about initializing with the previous desired value in order to have a smooth transition
# at least from the planning point of view. # at least from the planning point of view.
# self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
if self.current_traj_steps == 0:
self.condition_pos = self.current_pos
self.condition_vel = self.current_vel
bc_time = torch.as_tensor(bc_time, dtype=torch.float32) condition_pos = self.condition_pos if self.condition_pos is not None else self.current_pos
self.condition_pos = torch.as_tensor(self.condition_pos, dtype=torch.float32) condition_vel = self.condition_vel if self.condition_vel is not None else self.current_vel
self.condition_vel = torch.as_tensor(self.condition_vel, dtype=torch.float32)
self.traj_gen.set_boundary_conditions(bc_time, self.condition_pos, self.condition_vel) self.traj_gen.set_boundary_conditions(bc_time, condition_pos, condition_vel)
# self.traj_gen.set_duration(duration, self.dt)
self.traj_gen.set_duration(duration, self.dt) self.traj_gen.set_duration(duration, self.dt)
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True) # traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
position = get_numpy(self.traj_gen.get_traj_pos()) position = get_numpy(self.traj_gen.get_traj_pos())
@ -163,9 +153,6 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step""" """ This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
# time_is_valid = self.env.check_time_validity(action)
#
# if time_valid:
# TODO remove this part, right now only needed for beer pong # TODO remove this part, right now only needed for beer pong
# mp_params, env_spec_params, proceed = self.env.episode_callback(action, self.traj_gen) # mp_params, env_spec_params, proceed = self.env.episode_callback(action, self.traj_gen)
@ -182,15 +169,13 @@ class BlackBoxWrapper(gym.ObservationWrapper):
infos = dict() infos = dict()
done = False done = False
if self.verbose >= 2:
desired_pos_traj = []
desired_vel_traj = []
pos_traj = []
vel_traj = []
if traj_is_valid: if traj_is_valid:
self.plan_counts += 1 self.plan_steps += 1
for t, (pos, vel) in enumerate(zip(position, velocity)): for t, (pos, vel) in enumerate(zip(position, velocity)):
current_pos = self.current_pos
current_vel = self.current_vel
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel) step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high) c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
obs, c_reward, done, info = self.env.step(c_action) obs, c_reward, done, info = self.env.step(c_action)
@ -205,24 +190,17 @@ class BlackBoxWrapper(gym.ObservationWrapper):
elems[t] = v elems[t] = v
infos[k] = elems infos[k] = elems
if self.verbose >= 2:
desired_pos_traj.append(pos)
desired_vel_traj.append(vel)
pos_traj.append(self.current_pos)
vel_traj.append(self.current_vel)
if self.render_kwargs: if self.render_kwargs:
self.env.render(**self.render_kwargs) self.env.render(**self.render_kwargs)
if done or (self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action, if done or self.replanning_schedule(current_pos, current_vel, obs, c_action,
t + 1 + self.current_traj_steps) t + 1 + self.current_traj_steps):
and self.max_planning_times is not None and self.plan_counts < self.max_planning_times):
# if self.max_planning_times is not None and self.plan_counts >= self.max_planning_times: if self.max_planning_times is not None and self.plan_steps >= self.max_planning_times:
# continue continue
self.condition_pos = pos if self.desired_conditioning else self.current_pos self.condition_pos = pos if self.condition_on_desired else None
self.condition_vel = vel if self.desired_conditioning else self.current_vel self.condition_vel = vel if self.condition_on_desired else None
break break
@ -230,17 +208,11 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.current_traj_steps += t + 1 self.current_traj_steps += t + 1
if self.verbose >= 2: if self.verbose >= 2:
infos['desired_pos'] = position[:t+1] infos['positions'] = position
infos['desired_vel'] = velocity[:t+1] infos['velocities'] = velocity
infos['current_pos'] = self.current_pos
infos['current_vel'] = self.current_vel
infos['step_actions'] = actions[:t + 1] infos['step_actions'] = actions[:t + 1]
infos['step_observations'] = observations[:t + 1] infos['step_observations'] = observations[:t + 1]
infos['step_rewards'] = rewards[:t + 1] infos['step_rewards'] = rewards[:t + 1]
infos['desired_pos_traj'] = np.array(desired_pos_traj)
infos['desired_vel_traj'] = np.array(desired_vel_traj)
infos['pos_traj'] = np.array(pos_traj)
infos['vel_traj'] = np.array(vel_traj)
infos['trajectory_length'] = t + 1 infos['trajectory_length'] = t + 1
trajectory_return = self.reward_aggregation(rewards[:t + 1]) trajectory_return = self.reward_aggregation(rewards[:t + 1])
@ -256,7 +228,6 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
self.current_traj_steps = 0 self.current_traj_steps = 0
self.plan_counts = 0 self.plan_steps = 0
self.tau_first_prediction = None
self.traj_gen.reset() self.traj_gen.reset()
return super(BlackBoxWrapper, self).reset() return super(BlackBoxWrapper, self).reset()

View File

@ -28,9 +28,7 @@ DEFAULT_BB_DICT_ProMP = {
'trajectory_generator_type': 'promp' 'trajectory_generator_type': 'promp'
}, },
"phase_generator_kwargs": { "phase_generator_kwargs": {
'phase_generator_type': 'linear', 'phase_generator_type': 'linear'
'learn_tau': False,
'learn_delay': False,
}, },
"controller_kwargs": { "controller_kwargs": {
'controller_type': 'motor', 'controller_type': 'motor',
@ -77,8 +75,6 @@ DEFAULT_BB_DICT_ProDMP = {
}, },
"phase_generator_kwargs": { "phase_generator_kwargs": {
'phase_generator_type': 'exp', 'phase_generator_type': 'exp',
'learn_delay': False,
'learn_tau': False,
}, },
"controller_kwargs": { "controller_kwargs": {
'controller_type': 'motor', 'controller_type': 'motor',
@ -91,9 +87,6 @@ DEFAULT_BB_DICT_ProDMP = {
'num_basis': 5, 'num_basis': 5,
}, },
"black_box_kwargs": { "black_box_kwargs": {
'replanning_schedule': None,
'max_planning_times': None,
'verbose': 2
} }
} }
@ -512,24 +505,22 @@ for _v in _versions:
for _v in _versions: for _v in _versions:
_name = _v.split("-") _name = _v.split("-")
_env_id = f'{_name[0]}ProDMP-{_name[1]}' _env_id = f'{_name[0]}ReplanProDMP-{_name[1]}'
kwargs_dict_box_pushing_prodmp = deepcopy(DEFAULT_BB_DICT_ProDMP) kwargs_dict_box_pushing_prodmp = deepcopy(DEFAULT_BB_DICT_ProDMP)
kwargs_dict_box_pushing_prodmp['wrappers'].append(mujoco.box_pushing.MPWrapper) kwargs_dict_box_pushing_prodmp['wrappers'].append(mujoco.box_pushing.MPWrapper)
kwargs_dict_box_pushing_prodmp['name'] = _v kwargs_dict_box_pushing_prodmp['name'] = _v
kwargs_dict_box_pushing_prodmp['controller_kwargs']['p_gains'] = 0.01 * np.array([120., 120., 120., 120., 50., 30., 10.]) kwargs_dict_box_pushing_prodmp['controller_kwargs']['p_gains'] = 0.01 * np.array([120., 120., 120., 120., 50., 30., 10.])
kwargs_dict_box_pushing_prodmp['controller_kwargs']['d_gains'] = 0.01 * np.array([10., 10., 10., 10., 6., 5., 3.]) kwargs_dict_box_pushing_prodmp['controller_kwargs']['d_gains'] = 0.01 * np.array([10., 10., 10., 10., 6., 5., 3.])
# kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['weights_scale'] = np.array([3.4944e+01, 4.3734e+01, 9.6711e+01, 2.4429e+02, 5.8272e+02])
# kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_scale'] = 3.1264e-01
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['weights_scale'] = 0.3 kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['weights_scale'] = 0.3
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_scale'] = 0.3 kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_scale'] = 0.3
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['auto_scale_basis'] = True kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['auto_scale_basis'] = True
kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_offset'] = 1.0 kwargs_dict_box_pushing_prodmp['trajectory_generator_kwargs']['goal_offset'] = 1.0
kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['num_basis'] = 4 kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['num_basis'] = 4
kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['alpha'] = 10. kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['basis_bandwidth_factor'] = 3
kwargs_dict_box_pushing_prodmp['basis_generator_kwargs']['basis_bandwidth_factor'] = 3 # 3.5, 4 to try
kwargs_dict_box_pushing_prodmp['phase_generator_kwargs']['alpha_phase'] = 3 kwargs_dict_box_pushing_prodmp['phase_generator_kwargs']['alpha_phase'] = 3
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['max_planning_times'] = 4 kwargs_dict_box_pushing_prodmp['black_box_kwargs']['max_planning_times'] = 4
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 25 == 0 kwargs_dict_box_pushing_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 25 == 0
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['condition_on_desired'] = True
register( register(
id=_env_id, id=_env_id,
entry_point='fancy_gym.utils.make_env_helpers:make_bb_env_helper', entry_point='fancy_gym.utils.make_env_helpers:make_bb_env_helper',

View File

@ -219,6 +219,8 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
q_old = q q_old = q
q = q + dt * qd_d q = q + dt * qd_d
q = np.clip(q, q_min, q_max) q = np.clip(q, q_min, q_max)
self.data.qpos[:7] = q
mujoco.mj_forward(self.model, self.data)
current_cart_pos = self.data.body("tcp").xpos.copy() current_cart_pos = self.data.body("tcp").xpos.copy()
current_cart_quat = self.data.body("tcp").xquat.copy() current_cart_quat = self.data.body("tcp").xquat.copy()
@ -247,8 +249,10 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
### get Jacobian by mujoco ### get Jacobian by mujoco
self.data.qpos[:7] = q self.data.qpos[:7] = q
mujoco.mj_forward(self.model, self.data) mujoco.mj_forward(self.model, self.data)
jacp = self.get_body_jacp("tcp")[:, :7].copy() jacp = self.get_body_jacp("tcp")[:, :7].copy()
jacr = self.get_body_jacr("tcp")[:, :7].copy() jacr = self.get_body_jacr("tcp")[:, :7].copy()
J = np.concatenate((jacp, jacr), axis=0) J = np.concatenate((jacp, jacr), axis=0)
Jw = J.dot(w) Jw = J.dot(w)
@ -356,14 +360,3 @@ class BoxPushingTemporalSpatialSparse(BoxPushingEnvBase):
reward += box_goal_pos_dist_reward + box_goal_rot_dist_reward reward += box_goal_pos_dist_reward + box_goal_rot_dist_reward
return reward return reward
if __name__=="__main__":
env = BoxPushingTemporalSpatialSparse(frame_skip=10)
env.reset()
for i in range(10):
env.reset()
for _ in range(100):
env.render("human")
action = env.action_space.sample()
obs, reward, done, info = env.step(action)
print("info: {}".format(info))

View File

@ -1,38 +1,62 @@
import fancy_gym import fancy_gym
import numpy as np
import matplotlib.pyplot as plt
def plot_trajectory(traj): def example_run_replanning_env(env_name="BoxPushingDenseReplanProDMP-v0", seed=1, iterations=1, render=False):
plt.figure()
plt.plot(traj[:, 3])
plt.legend()
plt.show()
def run_replanning_envs(env_name="BoxPushingProDMP-v0", seed=1, iterations=1, render=True):
env = fancy_gym.make(env_name, seed=seed) env = fancy_gym.make(env_name, seed=seed)
env.reset() env.reset()
for i in range(iterations): for i in range(iterations):
done = False done = False
desired_pos_traj = np.zeros((100, 7))
desired_vel_traj = np.zeros((100, 7))
real_pos_traj = np.zeros((100, 7))
real_vel_traj = np.zeros((100, 7))
t = 0
while done is False: while done is False:
ac = env.action_space.sample() ac = env.action_space.sample()
obs, reward, done, info = env.step(ac) obs, reward, done, info = env.step(ac)
desired_pos_traj[t: t + 25, :] = info['desired_pos']
desired_vel_traj[t: t + 25, :] = info['desired_vel']
# real_pos_traj.append(info['current_pos'])
# real_vel_traj.append(info['current_vel'])
t += 25
if render: if render:
env.render(mode="human") env.render(mode="human")
if done: if done:
env.reset() env.reset()
plot_trajectory(desired_pos_traj)
env.close() env.close()
del env del env
def example_custom_replanning_envs(seed=0, iteration=100, render=True):
# id for a step-based environment
base_env_id = "BoxPushingDense-v0"
wrappers = [fancy_gym.envs.mujoco.box_pushing.mp_wrapper.MPWrapper]
trajectory_generator_kwargs = {'trajectory_generator_type': 'prodmp',
'weight_scale': 1}
phase_generator_kwargs = {'phase_generator_type': 'exp'}
controller_kwargs = {'controller_type': 'velocity'}
basis_generator_kwargs = {'basis_generator_type': 'prodmp',
'num_basis': 5}
# max_planning_times: the maximum number of plans can be generated
# replanning_schedule: the trigger for replanning
# condition_on_desired: use desired state as the boundary condition for the next plan
black_box_kwargs = {'max_planning_times': 4,
'replanning_schedule': lambda pos, vel, obs, action, t: t % 25 == 0,
'condition_on_desired': True}
env = fancy_gym.make_bb(env_id=base_env_id, wrappers=wrappers, black_box_kwargs=black_box_kwargs,
traj_gen_kwargs=trajectory_generator_kwargs, controller_kwargs=controller_kwargs,
phase_kwargs=phase_generator_kwargs, basis_kwargs=basis_generator_kwargs,
seed=seed)
if render:
env.render(mode="human")
obs = env.reset()
for i in range(iteration):
ac = env.action_space.sample()
obs, reward, done, info = env.step(ac)
if done:
env.reset()
env.close()
del env
if __name__ == "__main__": if __name__ == "__main__":
run_replanning_envs(env_name="BoxPushingDenseProDMP-v0", seed=1, iterations=1, render=False) # run a registered replanning environment
example_run_replanning_env(env_name="BoxPushingDenseReplanProDMP-v0", seed=1, iterations=1, render=False)
# run a custom replanning environment
example_custom_replanning_envs(seed=0, iteration=8, render=True)

View File

@ -17,8 +17,6 @@ def example_mp(env_name="HoleReacherProMP-v0", seed=1, iterations=1, render=True
# It takes care of seeding and enables the use of a variety of external environments using the gym interface. # It takes care of seeding and enables the use of a variety of external environments using the gym interface.
env = fancy_gym.make(env_name, seed) env = fancy_gym.make(env_name, seed)
# env.traj_gen.basis_gn.show_basis(plot=True)
returns = 0 returns = 0
# env.render(mode=None) # env.render(mode=None)
obs = env.reset() obs = env.reset()
@ -40,22 +38,16 @@ def example_mp(env_name="HoleReacherProMP-v0", seed=1, iterations=1, render=True
# Now the action space is not the raw action but the parametrization of the trajectory generator, # Now the action space is not the raw action but the parametrization of the trajectory generator,
# such as a ProMP # such as a ProMP
ac = env.action_space.sample() ac = env.action_space.sample()
# ac[0] = 0.6866657733917236
# ac[1] = 0.08587364107370377
# This executes a full trajectory and gives back the context (obs) of the last step in the trajectory, or the # This executes a full trajectory and gives back the context (obs) of the last step in the trajectory, or the
# full observation space of the last step, if replanning/sub-trajectory learning is used. The 'reward' is equal # full observation space of the last step, if replanning/sub-trajectory learning is used. The 'reward' is equal
# to the return of a trajectory. Default is the sum over the step-wise rewards. # to the return of a trajectory. Default is the sum over the step-wise rewards.
print(f'target obs: {obs[-3:]}')
obs, reward, done, info = env.step(ac) obs, reward, done, info = env.step(ac)
print(f'steps: {info["num_steps"][-1]}')
# Aggregated returns # Aggregated returns
returns += reward returns += reward
if done: if done:
# print(reward) print(reward)
obs = env.reset() obs = env.reset()
print("=================New Episode======================")
# print("steps: {}".format(info["num_steps"][-1]))
def example_custom_mp(env_name="Reacher5dProMP-v0", seed=1, iterations=1, render=True): def example_custom_mp(env_name="Reacher5dProMP-v0", seed=1, iterations=1, render=True):
@ -165,19 +157,17 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
if __name__ == '__main__': if __name__ == '__main__':
render = True render = True
# DMP # DMP
# example_mp("HoleReacherDMP-v0", seed=10, iterations=5, render=render) example_mp("HoleReacherDMP-v0", seed=10, iterations=5, render=render)
# ProMP # ProMP
# example_mp("HoleReacherProMP-v0", seed=10, iterations=5, render=render) example_mp("HoleReacherProMP-v0", seed=10, iterations=5, render=render)
# example_mp("BoxPushingTemporalSparseProMP-v0", seed=10, iterations=1, render=render) example_mp("BoxPushingTemporalSparseProMP-v0", seed=10, iterations=1, render=render)
# example_mp("TableTennis4DProMP-v0", seed=10, iterations=10, render=True)
# ProDMP # ProDMP
# example_mp("BoxPushingDenseProDMP-v0", seed=10, iterations=16, render=render) example_mp("BoxPushingDenseReplanProDMP-v0", seed=10, iterations=4, render=render)
example_mp("TableTennis4DProDMP-v0", seed=10, iterations=5000, render=render)
# Altered basis functions # Altered basis functions
# obs1 = example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=1, render=render) obs1 = example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=1, render=render)
# Custom MP # Custom MP
# example_fully_custom_mp(seed=10, iterations=1, render=render) example_fully_custom_mp(seed=10, iterations=1, render=render)

View File

@ -34,7 +34,7 @@ setup(
], ],
extras_require=extras, extras_require=extras,
install_requires=[ install_requires=[
'gym[mujoco]<0.25.0,>=0.24.0', 'gym[mujoco]<0.25.0,>=0.24.1',
'mp_pytorch @ git+https://github.com/ALRhub/MP_PyTorch.git@main' 'mp_pytorch @ git+https://github.com/ALRhub/MP_PyTorch.git@main'
], ],
packages=[package for package in find_packages() if package.startswith("fancy_gym")], packages=[package for package in find_packages() if package.startswith("fancy_gym")],

View File

@ -67,28 +67,32 @@ def test_missing_wrapper(env_id: str):
fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {})
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
def test_missing_local_state(mp_type: str): def test_missing_local_state(mp_type: str):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {}, env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
env.step(env.action_space.sample()) env.step(env.action_space.sample())
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('verbose', [1, 2]) @pytest.mark.parametrize('verbose', [1, 2])
def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose}, env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
info_keys = list(env.step(env.action_space.sample())[3].keys()) info_keys = list(env.step(env.action_space.sample())[3].keys())
@ -104,15 +108,17 @@ def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]
assert all(e in info_keys for e in mp_keys) assert all(e in info_keys for e in mp_keys)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {}, env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
for _ in range(5): for _ in range(5):
env.reset() env.reset()
@ -121,14 +127,15 @@ def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
assert length == env.spec.max_episode_steps assert length == env.spec.max_episode_steps
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) @pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])])
def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]): def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
# ToyEnv only returns 1 as reward # ToyEnv only returns 1 as reward
assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, ))
@ -149,12 +156,13 @@ def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapp
assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('num_dof', [0, 1, 2, 5]) @pytest.mark.parametrize('num_dof', [0, 1, 2, 5])
@pytest.mark.parametrize('num_basis', [0, 1, 2, 5]) @pytest.mark.parametrize('num_basis', [0, 1, 2, 5])
@pytest.mark.parametrize('learn_tau', [True, False]) @pytest.mark.parametrize('learn_tau', [True, False])
@pytest.mark.parametrize('learn_delay', [True, False]) @pytest.mark.parametrize('learn_delay', [True, False])
def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
'action_dim': num_dof 'action_dim': num_dof
@ -164,28 +172,29 @@ def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: boo
'learn_tau': learn_tau, 'learn_tau': learn_tau,
'learn_delay': learn_delay 'learn_delay': learn_delay
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
'num_basis': num_basis 'num_basis': num_basis
}) })
base_dims = num_dof * num_basis base_dims = num_dof * num_basis
additional_dims = num_dof if mp_type == 'dmp' else 0 additional_dims = num_dof if 'dmp' in mp_type else 0
traj_modification_dims = int(learn_tau) + int(learn_delay) traj_modification_dims = int(learn_tau) + int(learn_delay)
assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('a', [1]) @pytest.mark.parametrize('a', [1])
@pytest.mark.parametrize('b', [1.0]) @pytest.mark.parametrize('b', [1.0])
@pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]]) @pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]])
@pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}]) @pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}])
@pytest.mark.parametrize('e', [Object()]) @pytest.mark.parametrize('e', [Object()])
def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object): def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, {'basis_generator_type': basis_generator_type},
a=a, b=b, c=c, d=d, e=e a=a, b=b, c=c, d=d, e=e
) )
assert a is env.a assert a is env.a
@ -196,18 +205,20 @@ def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e:
assert e is env.e assert e is env.e
@pytest.mark.parametrize('mp_type', ['promp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
def test_learn_tau(mp_type: str, tau: float): def test_learn_tau(mp_type: str, tau: float):
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': phase_generator_type,
'learn_tau': True, 'learn_tau': True,
'learn_delay': False 'learn_delay': False
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
}, seed=SEED) }, seed=SEED)
d = True d = True
@ -228,26 +239,29 @@ def test_learn_tau(mp_type: str, tau: float):
vel = info['velocities'].flatten() vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis) # Check end is all same (only true for linear basis)
if phase_generator_type == "linear":
assert np.all(pos[tau_time_steps:] == pos[-1]) assert np.all(pos[tau_time_steps:] == pos[-1])
assert np.all(vel[tau_time_steps:] == vel[-1]) assert np.all(vel[tau_time_steps:] == vel[-1])
# Check active trajectory section is different to end values # Check active trajectory section is different to end values
assert np.all(pos[:tau_time_steps - 1] != pos[-1]) assert np.all(pos[:tau_time_steps - 1] != pos[-1])
assert np.all(vel[:tau_time_steps - 2] != vel[-1]) assert np.all(vel[:tau_time_steps - 2] != vel[-1])
#
#
@pytest.mark.parametrize('mp_type', ['promp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) @pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
def test_learn_delay(mp_type: str, delay: float): def test_learn_delay(mp_type: str, delay: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': phase_generator_type,
'learn_tau': False, 'learn_tau': False,
'learn_delay': True 'learn_delay': True
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
}, seed=SEED) }, seed=SEED)
d = True d = True
@ -274,21 +288,23 @@ def test_learn_delay(mp_type: str, delay: float):
# Check active trajectory section is different to beginning values # Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0]) assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0]) assert np.all(vel[max(1, delay_time_steps)] != vel[0])
#
#
@pytest.mark.parametrize('mp_type', ['promp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': phase_generator_type,
'learn_tau': True, 'learn_tau': True,
'learn_delay': True 'learn_delay': True
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': basis_generator_type,
}, seed=SEED) }, seed=SEED)
if env.spec.max_episode_steps * env.dt < delay + tau: if env.spec.max_episode_steps * env.dt < delay + tau:
@ -315,6 +331,7 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
vel = info['velocities'].flatten() vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis) # Check end is all same (only true for linear basis)
if phase_generator_type == "linear":
assert np.all(pos[joint_time_steps:] == pos[-1]) assert np.all(pos[joint_time_steps:] == pos[-1])
assert np.all(vel[joint_time_steps:] == vel[-1]) assert np.all(vel[joint_time_steps:] == vel[-1])

View File

@ -1,23 +0,0 @@
from itertools import chain
import pytest
import fancy_gym
from test.utils import run_env, run_env_determinism
Fancy_ProDMP_IDS = fancy_gym.ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS['ProDMP']
All_ProDMP_IDS = fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS['ProDMP']
@pytest.mark.parametrize('env_id', All_ProDMP_IDS)
def test_replanning_envs(env_id: str):
"""Tests that ProDMP environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', All_ProDMP_IDS)
def test_replanning_determinism(env_id: str):
"""Tests that ProDMP environments are deterministic."""
run_env_determinism(env_id, 0)

View File

@ -98,7 +98,7 @@ def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInter
assert length <= np.round(env.traj_gen.tau.numpy() / env.dt) assert length <= np.round(env.traj_gen.tau.numpy() / env.dt)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) @pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
@pytest.mark.parametrize('replanning_time', [10, 100, 1000]) @pytest.mark.parametrize('replanning_time', [10, 100, 1000])
@ -114,11 +114,14 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra
replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear'
env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2}, env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2},
{'trajectory_generator_type': mp_type}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'exp'}, {'phase_generator_type': phase_generator_type},
{'basis_generator_type': 'rbf'}, seed=SEED) {'basis_generator_type': basis_generator_type}, seed=SEED)
assert env.do_replanning assert env.do_replanning
assert callable(env.replanning_schedule) assert callable(env.replanning_schedule)
@ -142,3 +145,189 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra
env.reset() env.reset()
assert replanning_schedule(None, None, None, None, length) assert replanning_schedule(None, None, None, None, length)
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'max_planning_times': max_planning_times,
'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
_, _, d, _ = env.step(env.action_space.sample())
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
@pytest.mark.parametrize('tau', [0.5, 1.0, 1.5, 2.0])
def test_replanning_with_learn_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int, tau: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': True,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
action = env.action_space.sample()
action[0] = tau
_, _, d, info = env.step(action)
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
@pytest.mark.parametrize('delay', [0.1, 0.25, 0.5, 0.75])
def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_segment_steps: int, delay: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': True
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
action = env.action_space.sample()
action[0] = delay
_, _, d, info = env.step(action)
delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check beginning is all same (only true for linear basis)
if planning_times == 0:
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
# only valid when delay < sub_segment_steps
elif planning_times > 0 and delay_time_steps < sub_segment_steps:
assert np.all(pos[1:max(1, delay_time_steps - 1)] != pos[0])
assert np.all(vel[1:max(1, delay_time_steps - 2)] != vel[0])
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3])
@pytest.mark.parametrize('sub_segment_steps', [5, 10, 15])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
@pytest.mark.parametrize('tau', [0.5, 0.75, 1.0])
def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int,
delay: float, tau: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': True,
'learn_delay': True
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
action = env.action_space.sample()
action[0] = tau
action[1] = delay
_, _, d, info = env.step(action)
delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Delay only applies to first planning time
if planning_times == 0:
# Check delay is applied
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
planning_times += 1
assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_steps: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'max_planning_times': max_planning_times,
'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
for i in range(max_planning_times):
_, _, d, _ = env.step(env.action_space.sample())
assert d