diff --git a/fancy_gym/black_box/black_box_wrapper.py b/fancy_gym/black_box/black_box_wrapper.py index 8830987..ce96b20 100644 --- a/fancy_gym/black_box/black_box_wrapper.py +++ b/fancy_gym/black_box/black_box_wrapper.py @@ -86,6 +86,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): return observation.astype(self.observation_space.dtype) def get_trajectory(self, action: np.ndarray) -> Tuple: + # duration = self.duration - self.current_traj_steps * self.dt duration = self.duration if self.learn_sub_trajectories: duration = None diff --git a/test/test_black_box.py b/test/test_black_box.py index fa1cd01..e95cf12 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -372,30 +372,3 @@ def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_ _, _, d, _ = env.step(env.action_space.sample()) assert d -@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) -@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) -@pytest.mark.parametrize('sub_segment_steps', [5, 10]) -def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int): - basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' - phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' - env = fancy_gym.make_bb('toy-v0', [ToyWrapper], - {'max_planning_times': max_planning_times, - 'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, - 'verbose': 2}, - {'trajectory_generator_type': mp_type, - }, - {'controller_type': 'motor'}, - {'phase_generator_type': phase_generator_type, - 'learn_tau': False, - 'learn_delay': False - }, - {'basis_generator_type': basis_generator_type, - }, - seed=SEED) - _ = env.reset() - d = False - planning_times = 0 - while not d: - _, _, d, _ = env.step(env.action_space.sample()) - planning_times += 1 - assert planning_times == max_planning_times \ No newline at end of file diff --git a/test/test_replanning_sequencing.py b/test/test_replanning_sequencing.py index a42bb65..31d4f80 100644 --- a/test/test_replanning_sequencing.py +++ b/test/test_replanning_sequencing.py @@ -98,7 +98,7 @@ def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInter assert length <= np.round(env.traj_gen.tau.numpy() / env.dt) -@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) @pytest.mark.parametrize('replanning_time', [10, 100, 1000]) @@ -114,11 +114,14 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear' + env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2}, {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'exp'}, - {'basis_generator_type': 'rbf'}, seed=SEED) + {'phase_generator_type': phase_generator_type}, + {'basis_generator_type': basis_generator_type}, seed=SEED) assert env.do_replanning assert callable(env.replanning_schedule) @@ -142,3 +145,163 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra env.reset() assert replanning_schedule(None, None, None, None, length) + +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) +@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) +@pytest.mark.parametrize('sub_segment_steps', [5, 10]) +def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], + {'max_planning_times': max_planning_times, + 'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': False, + 'learn_delay': False + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) + _ = env.reset() + d = False + planning_times = 0 + while not d: + _, _, d, _ = env.step(env.action_space.sample()) + planning_times += 1 + assert planning_times == max_planning_times + +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) +@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) +@pytest.mark.parametrize('sub_segment_steps', [5, 10]) +@pytest.mark.parametrize('tau', [0.5, 1.0, 1.5, 2.0]) +def test_replanning_with_learn_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int, tau: float): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], + {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'max_planning_times': max_planning_times, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': True, + 'learn_delay': False + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) + _ = env.reset() + d = False + planning_times = 0 + while not d: + action = env.action_space.sample() + action[0] = tau + _, _, d, info = env.step(action) + planning_times += 1 + assert planning_times == max_planning_times + +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) +@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) +@pytest.mark.parametrize('sub_segment_steps', [5, 10]) +@pytest.mark.parametrize('delay', [0.1, 0.25, 0.5, 0.75]) +def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_segment_steps: int, delay: float): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], + {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'max_planning_times': max_planning_times, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': False, + 'learn_delay': True + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) + _ = env.reset() + d = False + planning_times = 0 + while not d: + action = env.action_space.sample() + action[0] = delay + _, _, d, info = env.step(action) + + delay_time_steps = int(np.round(delay / env.dt)) + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check beginning is all same (only true for linear basis) + if planning_times == 0: + assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0]) + assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0]) + + # only valid when delay < sub_segment_steps + elif planning_times > 0 and delay_time_steps < sub_segment_steps: + assert np.all(pos[1:max(1, delay_time_steps - 1)] != pos[0]) + assert np.all(vel[1:max(1, delay_time_steps - 2)] != vel[0]) + + # Check active trajectory section is different to beginning values + assert np.all(pos[max(1, delay_time_steps):] != pos[0]) + assert np.all(vel[max(1, delay_time_steps)] != vel[0]) + + planning_times += 1 + + assert planning_times == max_planning_times + +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) +@pytest.mark.parametrize('max_planning_times', [1, 2, 3]) +@pytest.mark.parametrize('sub_segment_steps', [5, 10, 15]) +@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) +@pytest.mark.parametrize('tau', [0.5, 0.75, 1.0]) +def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int, + delay: float, tau: float): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], + {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'max_planning_times': max_planning_times, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': True, + 'learn_delay': True + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) + _ = env.reset() + d = False + planning_times = 0 + while not d: + action = env.action_space.sample() + action[0] = tau + action[1] = delay + _, _, d, info = env.step(action) + + delay_time_steps = int(np.round(delay / env.dt)) + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Delay only applies to first planning time + if planning_times == 0: + # Check delay is applied + assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0]) + assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0]) + # Check active trajectory section is different to beginning values + assert np.all(pos[max(1, delay_time_steps):] != pos[0]) + assert np.all(vel[max(1, delay_time_steps)] != vel[0]) + + planning_times += 1 + + assert planning_times == max_planning_times \ No newline at end of file