diff --git a/test/test_black_box.py b/test/test_black_box.py index 1b9e8e2..7d33a30 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -94,11 +94,13 @@ def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]] {'phase_generator_type': 'exp'}, {'basis_generator_type': basis_generator_type}) env.reset() - info_keys = list(env.step(env.action_space.sample())[3].keys()) + _obs, _reward, _terminated, _truncated, info = env.step(env.action_space.sample()) + info_keys = list(info.keys()) env_step = fancy_gym.make(env_id, SEED) env_step.reset() - info_keys_step = env_step.step(env_step.action_space.sample())[3].keys() + _obs, _reward, _terminated, _truncated, info = env.step(env.action_space.sample()) + info_keys_step = info.keys() assert all(e in info_keys for e in info_keys_step) assert 'trajectory_length' in info_keys @@ -122,7 +124,8 @@ def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): for _ in range(5): env.reset() - length = env.step(env.action_space.sample())[3]['trajectory_length'] + _obs, _reward, _terminated, _truncated, info = env.step(env.action_space.sample()) + length = info['trajectory_length'] assert length == env.spec.max_episode_steps @@ -138,7 +141,8 @@ def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], fl {'basis_generator_type': basis_generator_type}) env.reset() # ToyEnv only returns 1 as reward - assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) + _obs, reward, _terminated, _truncated, _info = env.step(env.action_space.sample()) + assert reward == reward_aggregation(np.ones(50, )) @pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @@ -250,6 +254,8 @@ def test_learn_tau(mp_type: str, tau: float): assert np.all(vel[:tau_time_steps - 2] != vel[-1]) # # + + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) def test_learn_delay(mp_type: str, delay: float): @@ -292,6 +298,8 @@ def test_learn_delay(mp_type: str, delay: float): assert np.all(vel[max(1, delay_time_steps)] != vel[0]) # # + + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) @@ -312,15 +320,16 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): if env.spec.max_episode_steps * env.dt < delay + tau: return - d = True + done = True for i in range(5): - if d: + if done: env.reset() action = env.action_space.sample() action[0] = tau action[1] = delay - obs, r, d, info = env.step(action) + _obs, _reward, terminated, truncated, info = env.step(action) + done = terminated or truncated length = info['trajectory_length'] assert length == env.spec.max_episode_steps @@ -345,4 +354,4 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): active_pos = pos[delay_time_steps: joint_time_steps - 1] active_vel = vel[delay_time_steps: joint_time_steps - 2] assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0]) - assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) \ No newline at end of file + assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) diff --git a/test/test_metaworld_envs.py b/test/test_metaworld_envs.py index 77d0c35..55de621 100644 --- a/test/test_metaworld_envs.py +++ b/test/test_metaworld_envs.py @@ -8,11 +8,7 @@ from test.utils import run_env, run_env_determinism METAWORLD_IDS = [f'metaworld:{env.split("-goal-observable")[0]}' for env, _ in ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE.items()] -<<<<<<< HEAD -METAWORLD_MP_IDS = chain(*fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values()) -======= METAWORLD_MP_IDS = list(chain(*fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())) ->>>>>>> 47-update-to-new-gym-api SEED = 1 diff --git a/test/test_replanning_sequencing.py b/test/test_replanning_sequencing.py index 9d04d02..b76d6a9 100644 --- a/test/test_replanning_sequencing.py +++ b/test/test_replanning_sequencing.py @@ -79,13 +79,14 @@ def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInter # This also verifies we are not adding the TimeAwareObservationWrapper twice assert env.observation_space == env_step.observation_space - d = True + done = True for i in range(25): - if d: + if done: env.reset() action = env.action_space.sample() - obs, r, d, info = env.step(action) + _obs, _reward, terminated, truncated, info = env.step(action) + done = terminated or truncated length = info['trajectory_length'] @@ -112,7 +113,7 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra if add_time_aware_wrapper_before: wrappers += [TimeAwareObservation] - replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 + def replanning_schedule(c_pos, c_vel, obs, c_action, t): return t % replanning_time == 0 basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear' @@ -134,18 +135,20 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra # Make 3 episodes, total steps depend on the replanning steps for i in range(3 * episode_steps): action = env.action_space.sample() - obs, r, d, info = env.step(action) + _obs, _reward, terminated, truncated, info = env.step(action) + done = terminated or truncated length = info['trajectory_length'] - if d: + if done: # Check if number of steps until termination match the replanning interval - print(d, (i + 1), episode_steps) + print(done, (i + 1), episode_steps) assert (i + 1) % episode_steps == 0 env.reset() assert replanning_schedule(None, None, None, None, length) + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('sub_segment_steps', [5, 10]) @@ -167,13 +170,16 @@ def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_s }, seed=SEED) _ = env.reset() - d = False + done = False planning_times = 0 - while not d: - _, _, d, _ = env.step(env.action_space.sample()) + while not done: + action = env.action_space.sample() + _obs, _reward, terminated, truncated, _info = env.step(action) + done = terminated or truncated planning_times += 1 assert planning_times == max_planning_times + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('sub_segment_steps', [5, 10]) @@ -196,15 +202,17 @@ def test_replanning_with_learn_tau(mp_type: str, max_planning_times: int, sub_se }, seed=SEED) _ = env.reset() - d = False + done = False planning_times = 0 - while not d: + while not done: action = env.action_space.sample() action[0] = tau - _, _, d, info = env.step(action) + _obs, _reward, terminated, truncated, _info = env.step(action) + done = terminated or truncated planning_times += 1 assert planning_times == max_planning_times + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('sub_segment_steps', [5, 10]) @@ -213,26 +221,27 @@ def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_ basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' env = fancy_gym.make_bb('toy-v0', [ToyWrapper], - {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, - 'max_planning_times': max_planning_times, - 'verbose': 2}, - {'trajectory_generator_type': mp_type, - }, - {'controller_type': 'motor'}, - {'phase_generator_type': phase_generator_type, - 'learn_tau': False, - 'learn_delay': True - }, - {'basis_generator_type': basis_generator_type, - }, - seed=SEED) + {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'max_planning_times': max_planning_times, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': False, + 'learn_delay': True + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) _ = env.reset() - d = False + done = False planning_times = 0 - while not d: + while not done: action = env.action_space.sample() action[0] = delay - _, _, d, info = env.step(action) + _obs, _reward, terminated, truncated, info = env.step(action) + done = terminated or truncated delay_time_steps = int(np.round(delay / env.dt)) pos = info['positions'].flatten() @@ -256,6 +265,7 @@ def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_ assert planning_times == max_planning_times + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('max_planning_times', [1, 2, 3]) @pytest.mark.parametrize('sub_segment_steps', [5, 10, 15]) @@ -266,27 +276,28 @@ def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: i basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' env = fancy_gym.make_bb('toy-v0', [ToyWrapper], - {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, - 'max_planning_times': max_planning_times, - 'verbose': 2}, - {'trajectory_generator_type': mp_type, - }, - {'controller_type': 'motor'}, - {'phase_generator_type': phase_generator_type, - 'learn_tau': True, - 'learn_delay': True - }, - {'basis_generator_type': basis_generator_type, - }, - seed=SEED) + {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'max_planning_times': max_planning_times, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': True, + 'learn_delay': True + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) _ = env.reset() - d = False + done = False planning_times = 0 - while not d: + while not done: action = env.action_space.sample() action[0] = tau action[1] = delay - _, _, d, info = env.step(action) + _obs, _reward, terminated, truncated, info = env.step(action) + done = terminated or truncated delay_time_steps = int(np.round(delay / env.dt)) @@ -306,6 +317,7 @@ def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: i assert planning_times == max_planning_times + @pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('sub_segment_steps', [5, 10]) @@ -327,7 +339,8 @@ def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_ }, seed=SEED) _ = env.reset() - d = False for i in range(max_planning_times): - _, _, d, _ = env.step(env.action_space.sample()) - assert d + action = env.action_space.sample() + _obs, _reward, terminated, truncated, _info = env.step(action) + done = terminated or truncated + assert done