diff --git a/fancy_gym/black_box/black_box_wrapper.py b/fancy_gym/black_box/black_box_wrapper.py index fbcf58d..5ba6607 100644 --- a/fancy_gym/black_box/black_box_wrapper.py +++ b/fancy_gym/black_box/black_box_wrapper.py @@ -1,4 +1,4 @@ -from typing import Tuple, Optional +from typing import Tuple, Optional, Callable import gym import numpy as np @@ -9,7 +9,6 @@ from fancy_gym.black_box.controller.base_controller import BaseController from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper from fancy_gym.utils.utils import get_numpy -import torch class BlackBoxWrapper(gym.ObservationWrapper): @@ -20,8 +19,9 @@ class BlackBoxWrapper(gym.ObservationWrapper): duration: float, verbose: int = 1, learn_sub_trajectories: bool = False, - replanning_schedule: Optional[callable] = None, - reward_aggregation: callable = np.sum + replanning_schedule: Optional[ + Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None, + reward_aggregation: Callable[[np.ndarray], float] = np.sum ): """ gym.Wrapper for leveraging a black box approach with a trajectory generator. @@ -48,12 +48,10 @@ class BlackBoxWrapper(gym.ObservationWrapper): # trajectory generation self.traj_gen = trajectory_generator - self.tracking_controller = tracking_controller # self.time_steps = np.linspace(0, self.duration, self.traj_steps) # self.traj_gen.set_mp_times(self.time_steps) self.traj_gen.set_duration(self.duration, self.dt) - # self.traj_gen.basis_gn.show_basis(plot=True) # reward computation self.reward_aggregation = reward_aggregation @@ -76,27 +74,30 @@ class BlackBoxWrapper(gym.ObservationWrapper): return observation.astype(self.observation_space.dtype) def get_trajectory(self, action: np.ndarray) -> Tuple: + duration = self.duration + if self.learn_sub_trajectories: + duration = None + # reset with every new call as we need to set all arguments, such as tau, delay, again. + # If we do not do this, the traj_gen assumes we are continuing the trajectory. + self.traj_gen.reset() + clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high) self.traj_gen.set_params(clipped_params) bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt) # TODO we could think about initializing with the previous desired value in order to have a smooth transition # at least from the planning point of view. - - self.traj_gen.set_boundary_conditions(bc_time, - self.current_pos, - self.current_vel) - duration = None if self.learn_sub_trajectories else self.duration + self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel) self.traj_gen.set_duration(duration, self.dt) # traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True) - trajectory = get_numpy(self.traj_gen.get_traj_pos()) + position = get_numpy(self.traj_gen.get_traj_pos()) velocity = get_numpy(self.traj_gen.get_traj_vel()) - if self.do_replanning: - # Remove first part of trajectory as this is already over - trajectory = trajectory[self.current_traj_steps:] - velocity = velocity[self.current_traj_steps:] + # if self.do_replanning: + # # Remove first part of trajectory as this is already over + # position = position[self.current_traj_steps:] + # velocity = velocity[self.current_traj_steps:] - return trajectory, velocity + return position, velocity def _get_traj_gen_action_space(self): """This function can be used to set up an individual space for the parameters of the traj_gen.""" @@ -131,9 +132,9 @@ class BlackBoxWrapper(gym.ObservationWrapper): # TODO remove this part, right now only needed for beer pong mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen) - trajectory, velocity = self.get_trajectory(mp_params) + position, velocity = self.get_trajectory(mp_params) - trajectory_length = len(trajectory) + trajectory_length = len(position) rewards = np.zeros(shape=(trajectory_length,)) if self.verbose >= 2: actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape) @@ -143,7 +144,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): infos = dict() done = False - for t, (pos, vel) in enumerate(zip(trajectory, velocity)): + for t, (pos, vel) in enumerate(zip(position, velocity)): step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel) c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high) obs, c_reward, done, info = self.env.step(c_action) @@ -169,7 +170,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): self.current_traj_steps += t + 1 if self.verbose >= 2: - infos['positions'] = trajectory + infos['positions'] = position infos['velocities'] = velocity infos['step_actions'] = actions[:t + 1] infos['step_observations'] = observations[:t + 1] @@ -186,4 +187,5 @@ class BlackBoxWrapper(gym.ObservationWrapper): def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): self.current_traj_steps = 0 + self.traj_gen.reset() return super(BlackBoxWrapper, self).reset() diff --git a/fancy_gym/meta/object_change_mp_wrapper.py b/fancy_gym/meta/object_change_mp_wrapper.py index 6faecc9..ca99c22 100644 --- a/fancy_gym/meta/object_change_mp_wrapper.py +++ b/fancy_gym/meta/object_change_mp_wrapper.py @@ -32,7 +32,7 @@ class MPWrapper(BaseMetaworldMPWrapper): # Current observation [False] * 3, # end-effector position [False] * 1, # normalized gripper open distance - [False] * 3, # main object position + [True] * 3, # main object position [False] * 4, # main object quaternion [False] * 3, # secondary object position [False] * 4, # secondary object quaternion diff --git a/fancy_gym/utils/make_env_helpers.py b/fancy_gym/utils/make_env_helpers.py index 18ab6ed..3c73ba9 100644 --- a/fancy_gym/utils/make_env_helpers.py +++ b/fancy_gym/utils/make_env_helpers.py @@ -166,6 +166,15 @@ def make_bb( # We have to learn the length when learning sub_trajectories trajectories phase_kwargs['learn_tau'] = True + # set tau bounds to minimum of two env steps otherwise computing the velocity is not possible. + # maximum is full duration of one episode. + if phase_kwargs.get('learn_tau'): + phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']] + + # Max delay is full duration minus two steps due to above reason + if phase_kwargs.get('learn_delay'): + phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2] + phase_gen = get_phase_generator(**phase_kwargs) basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs) controller = get_controller(**controller_kwargs) diff --git a/test/test_black_box.py b/test/test_black_box.py index fb7f78c..d5e3a88 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -1,5 +1,5 @@ from itertools import chain -from typing import Tuple, Type, Union, Optional +from typing import Tuple, Type, Union, Optional, Callable import gym import numpy as np @@ -25,7 +25,7 @@ class Object(object): class ToyEnv(gym.Env): observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) - dt = 0.01 + dt = 0.02 def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()): self.a, self.b, self.c, self.d, self.e = a, b, c, d, e @@ -49,7 +49,7 @@ class ToyWrapper(RawInterfaceWrapper): @property def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: - return np.ones(self.action_space.shape) + return np.zeros(self.action_space.shape) @pytest.fixture(scope="session", autouse=True) @@ -67,74 +67,81 @@ def test_missing_wrapper(env_id: str): fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) -def test_missing_local_state(): +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +def test_missing_local_state(mp_type: str): env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) env.reset() with pytest.raises(NotImplementedError): env.step(env.action_space.sample()) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('verbose', [1, 2]) -def test_verbosity(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): +def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): env_id, wrapper_class = env_wrap - env = fancy_gym.make_bb(env_id, [wrapper_class], {}, - {'trajectory_generator_type': 'promp'}, + env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) env.reset() - info_keys = env.step(env.action_space.sample())[3].keys() + info_keys = list(env.step(env.action_space.sample())[3].keys()) env_step = fancy_gym.make(env_id, SEED) env_step.reset() info_keys_step = env_step.step(env_step.action_space.sample())[3].keys() - assert info_keys_step in info_keys + assert all(e in info_keys for e in info_keys_step) assert 'trajectory_length' in info_keys if verbose >= 2: - mp_keys = ['position', 'velocities', 'step_actions', 'step_observations', 'step_rewards'] - assert mp_keys in info_keys + mp_keys = ['positions', 'velocities', 'step_actions', 'step_observations', 'step_rewards'] + assert all(e in info_keys for e in mp_keys) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) -def test_length(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): +def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): env_id, wrapper_class = env_wrap env = fancy_gym.make_bb(env_id, [wrapper_class], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) - env.reset() - length = env.step(env.action_space.sample())[3]['trajectory_length'] - assert length == env.spec.max_episode_steps + for _ in range(5): + env.reset() + length = env.step(env.action_space.sample())[3]['trajectory_length'] + + assert length == env.spec.max_episode_steps +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) -def test_aggregation(reward_aggregation: callable): +def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) env.reset() # ToyEnv only returns 1 as reward assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) -def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): +def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): env_id, wrapper_class = env_wrap env = fancy_gym.make_bb(env_id, [wrapper_class], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) # check if observation space matches with the specified mask values which are true env_step = fancy_gym.make(env_id, SEED) @@ -142,36 +149,42 @@ def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('num_dof', [0, 1, 2, 5]) @pytest.mark.parametrize('num_basis', [0, 1, 2, 5]) @pytest.mark.parametrize('learn_tau', [True, False]) @pytest.mark.parametrize('learn_delay', [True, False]) -def test_action_space(num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): +def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, - {'trajectory_generator_type': 'promp', + {'trajectory_generator_type': mp_type, 'action_dim': num_dof }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear', + {'phase_generator_type': 'exp', 'learn_tau': learn_tau, 'learn_delay': learn_delay }, {'basis_generator_type': 'rbf', 'num_basis': num_basis }) - assert env.action_space.shape[0] == num_dof * num_basis + int(learn_tau) + int(learn_delay) + + base_dims = num_dof * num_basis + additional_dims = num_dof if mp_type == 'dmp' else 0 + traj_modification_dims = int(learn_tau) + int(learn_delay) + assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('a', [1]) @pytest.mark.parametrize('b', [1.0]) @pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]]) @pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}]) @pytest.mark.parametrize('e', [Object()]) -def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object): +def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}, a=a, b=b, c=c, d=d, e=e ) @@ -183,33 +196,134 @@ def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object): assert e is env.e -@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) -@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) -def test_learn_sub_trajectories(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], add_time_aware_wrapper_before: bool): - env_id, wrapper_class = env_wrap - env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) - wrappers = [wrapper_class] - - # has time aware wrapper - if add_time_aware_wrapper_before: - wrappers += [TimeAwareObservation] - - env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True}, - {'trajectory_generator_type': 'promp'}, +@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) +def test_learn_tau(mp_type: str, tau: float): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, - {'basis_generator_type': 'rbf'}) + {'phase_generator_type': 'linear', + 'learn_tau': True, + 'learn_delay': False + }, + {'basis_generator_type': 'rbf', + }, seed=SEED) - assert env.learn_sub_trajectories - assert env.traj_gen.learn_tau - assert env.observation_space == env_step.observation_space + d = True + for i in range(5): + if d: + env.reset() + action = env.action_space.sample() + action[0] = tau - env.reset() - action = env.action_space.sample() - obs, r, d, info = env.step(action) + obs, r, d, info = env.step(action) - length = info['trajectory_length'] + length = info['trajectory_length'] + assert length == env.spec.max_episode_steps - factor = 1 / env.dt - assert np.allclose(length * env.dt, np.round(factor * action[0]) / factor) - assert np.allclose(length * env.dt, np.round(factor * env.traj_gen.tau.numpy()) / factor) + tau_time_steps = int(np.round(tau / env.dt)) + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check end is all same (only true for linear basis) + assert np.all(pos[tau_time_steps:] == pos[-1]) + assert np.all(vel[tau_time_steps:] == vel[-1]) + + # Check active trajectory section is different to end values + assert np.all(pos[:tau_time_steps - 1] != pos[-1]) + assert np.all(vel[:tau_time_steps - 2] != vel[-1]) + + +@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) +def test_learn_delay(mp_type: str, delay: float): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear', + 'learn_tau': False, + 'learn_delay': True + }, + {'basis_generator_type': 'rbf', + }, seed=SEED) + + d = True + for i in range(5): + if d: + env.reset() + action = env.action_space.sample() + action[0] = delay + + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + assert length == env.spec.max_episode_steps + + delay_time_steps = int(np.round(delay / env.dt)) + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check beginning is all same (only true for linear basis) + assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0]) + assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0]) + + # Check active trajectory section is different to beginning values + assert np.all(pos[max(1, delay_time_steps):] != pos[0]) + assert np.all(vel[max(1, delay_time_steps)] != vel[0]) + + +@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) +@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) +def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear', + 'learn_tau': True, + 'learn_delay': True + }, + {'basis_generator_type': 'rbf', + }, seed=SEED) + + if env.spec.max_episode_steps * env.dt < delay + tau: + return + + d = True + for i in range(5): + if d: + env.reset() + action = env.action_space.sample() + action[0] = tau + action[1] = delay + + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + assert length == env.spec.max_episode_steps + + tau_time_steps = int(np.round(tau / env.dt)) + delay_time_steps = int(np.round(delay / env.dt)) + joint_time_steps = delay_time_steps + tau_time_steps + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check end is all same (only true for linear basis) + assert np.all(pos[joint_time_steps:] == pos[-1]) + assert np.all(vel[joint_time_steps:] == vel[-1]) + + # Check beginning is all same (only true for linear basis) + assert np.all(pos[:delay_time_steps - 1] == pos[0]) + assert np.all(vel[:delay_time_steps - 2] == vel[0]) + + # Check active trajectory section is different to beginning and end values + active_pos = pos[delay_time_steps: joint_time_steps - 1] + active_vel = vel[delay_time_steps: joint_time_steps - 2] + assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0]) + assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) diff --git a/test/test_replanning_sequencing.py b/test/test_replanning_sequencing.py new file mode 100644 index 0000000..a42bb65 --- /dev/null +++ b/test/test_replanning_sequencing.py @@ -0,0 +1,144 @@ +from itertools import chain +from types import FunctionType +from typing import Tuple, Type, Union, Optional + +import gym +import numpy as np +import pytest +from gym import register +from gym.core import ActType, ObsType + +import fancy_gym +from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper +from fancy_gym.utils.time_aware_observation import TimeAwareObservation + +SEED = 1 +ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2'] +WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper, + fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper] +ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values()) + + +class ToyEnv(gym.Env): + observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) + action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) + dt = 0.02 + + def reset(self, *, seed: Optional[int] = None, return_info: bool = False, + options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]: + return np.array([-1]) + + def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]: + return np.array([-1]), 1, False, {} + + def render(self, mode="human"): + pass + + +class ToyWrapper(RawInterfaceWrapper): + + @property + def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + return np.ones(self.action_space.shape) + + @property + def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + return np.zeros(self.action_space.shape) + + +@pytest.fixture(scope="session", autouse=True) +def setup(): + register( + id=f'toy-v0', + entry_point='test.test_black_box:ToyEnv', + max_episode_steps=50, + ) + + +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) +def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], + add_time_aware_wrapper_before: bool): + env_id, wrapper_class = env_wrap + env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) + wrappers = [wrapper_class] + + # has time aware wrapper + if add_time_aware_wrapper_before: + wrappers += [TimeAwareObservation] + + env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True, 'verbose': 2}, + {'trajectory_generator_type': mp_type}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'exp'}, + {'basis_generator_type': 'rbf'}, seed=SEED) + + assert env.learn_sub_trajectories + assert env.traj_gen.learn_tau + # This also verifies we are not adding the TimeAwareObservationWrapper twice + assert env.observation_space == env_step.observation_space + + d = True + + for i in range(25): + if d: + env.reset() + action = env.action_space.sample() + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + + if not d: + assert length == np.round(action[0] / env.dt) + assert length == np.round(env.traj_gen.tau.numpy() / env.dt) + else: + # When done trajectory could be shorter due to termination. + assert length <= np.round(action[0] / env.dt) + assert length <= np.round(env.traj_gen.tau.numpy() / env.dt) + + +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) +@pytest.mark.parametrize('replanning_time', [10, 100, 1000]) +def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], + add_time_aware_wrapper_before: bool, replanning_time: int): + env_id, wrapper_class = env_wrap + env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) + wrappers = [wrapper_class] + + # has time aware wrapper + if add_time_aware_wrapper_before: + wrappers += [TimeAwareObservation] + + replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 + + env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2}, + {'trajectory_generator_type': mp_type}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'exp'}, + {'basis_generator_type': 'rbf'}, seed=SEED) + + assert env.do_replanning + assert callable(env.replanning_schedule) + # This also verifies we are not adding the TimeAwareObservationWrapper twice + assert env.observation_space == env_step.observation_space + + env.reset() + + episode_steps = env_step.spec.max_episode_steps // replanning_time + # Make 3 episodes, total steps depend on the replanning steps + for i in range(3 * episode_steps): + action = env.action_space.sample() + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + + if d: + # Check if number of steps until termination match the replanning interval + print(d, (i + 1), episode_steps) + assert (i + 1) % episode_steps == 0 + env.reset() + + assert replanning_schedule(None, None, None, None, length)