Adapted test to new gym interface

In prevous gym versions executing a step returned
obs, reward, done, info = env.step(...)

With the switch to gymnasium this has changed to
obs, reward, terminated, truncated, info = env.step(...)

We also made the code a bit more self explainatory.
This commit is contained in:
Dominik Moritz Roth 2023-05-18 17:31:40 +02:00
parent 228e343a1b
commit a559f92562
3 changed files with 78 additions and 60 deletions

View File

@ -94,11 +94,13 @@ def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]
{'phase_generator_type': 'exp'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': basis_generator_type}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
info_keys = list(env.step(env.action_space.sample())[3].keys()) _obs, _reward, _terminated, _truncated, info = env.step(env.action_space.sample())
info_keys = list(info.keys())
env_step = fancy_gym.make(env_id, SEED) env_step = fancy_gym.make(env_id, SEED)
env_step.reset() env_step.reset()
info_keys_step = env_step.step(env_step.action_space.sample())[3].keys() _obs, _reward, _terminated, _truncated, info = env.step(env.action_space.sample())
info_keys_step = info.keys()
assert all(e in info_keys for e in info_keys_step) assert all(e in info_keys for e in info_keys_step)
assert 'trajectory_length' in info_keys assert 'trajectory_length' in info_keys
@ -122,7 +124,8 @@ def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
for _ in range(5): for _ in range(5):
env.reset() env.reset()
length = env.step(env.action_space.sample())[3]['trajectory_length'] _obs, _reward, _terminated, _truncated, info = env.step(env.action_space.sample())
length = info['trajectory_length']
assert length == env.spec.max_episode_steps assert length == env.spec.max_episode_steps
@ -138,7 +141,8 @@ def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], fl
{'basis_generator_type': basis_generator_type}) {'basis_generator_type': basis_generator_type})
env.reset() env.reset()
# ToyEnv only returns 1 as reward # ToyEnv only returns 1 as reward
assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) _obs, reward, _terminated, _truncated, _info = env.step(env.action_space.sample())
assert reward == reward_aggregation(np.ones(50, ))
@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@ -250,6 +254,8 @@ def test_learn_tau(mp_type: str, tau: float):
assert np.all(vel[:tau_time_steps - 2] != vel[-1]) assert np.all(vel[:tau_time_steps - 2] != vel[-1])
# #
# #
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) @pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
def test_learn_delay(mp_type: str, delay: float): def test_learn_delay(mp_type: str, delay: float):
@ -292,6 +298,8 @@ def test_learn_delay(mp_type: str, delay: float):
assert np.all(vel[max(1, delay_time_steps)] != vel[0]) assert np.all(vel[max(1, delay_time_steps)] != vel[0])
# #
# #
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
@ -312,15 +320,16 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
if env.spec.max_episode_steps * env.dt < delay + tau: if env.spec.max_episode_steps * env.dt < delay + tau:
return return
d = True done = True
for i in range(5): for i in range(5):
if d: if done:
env.reset() env.reset()
action = env.action_space.sample() action = env.action_space.sample()
action[0] = tau action[0] = tau
action[1] = delay action[1] = delay
obs, r, d, info = env.step(action) _obs, _reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
length = info['trajectory_length'] length = info['trajectory_length']
assert length == env.spec.max_episode_steps assert length == env.spec.max_episode_steps
@ -345,4 +354,4 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
active_pos = pos[delay_time_steps: joint_time_steps - 1] active_pos = pos[delay_time_steps: joint_time_steps - 1]
active_vel = vel[delay_time_steps: joint_time_steps - 2] active_vel = vel[delay_time_steps: joint_time_steps - 2]
assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0]) assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0])
assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0])

View File

@ -8,11 +8,7 @@ from test.utils import run_env, run_env_determinism
METAWORLD_IDS = [f'metaworld:{env.split("-goal-observable")[0]}' for env, _ in METAWORLD_IDS = [f'metaworld:{env.split("-goal-observable")[0]}' for env, _ in
ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE.items()] ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE.items()]
<<<<<<< HEAD
METAWORLD_MP_IDS = chain(*fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
=======
METAWORLD_MP_IDS = list(chain(*fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())) METAWORLD_MP_IDS = list(chain(*fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values()))
>>>>>>> 47-update-to-new-gym-api
SEED = 1 SEED = 1

View File

@ -79,13 +79,14 @@ def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInter
# This also verifies we are not adding the TimeAwareObservationWrapper twice # This also verifies we are not adding the TimeAwareObservationWrapper twice
assert env.observation_space == env_step.observation_space assert env.observation_space == env_step.observation_space
d = True done = True
for i in range(25): for i in range(25):
if d: if done:
env.reset() env.reset()
action = env.action_space.sample() action = env.action_space.sample()
obs, r, d, info = env.step(action) _obs, _reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
length = info['trajectory_length'] length = info['trajectory_length']
@ -112,7 +113,7 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra
if add_time_aware_wrapper_before: if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation] wrappers += [TimeAwareObservation]
replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 def replanning_schedule(c_pos, c_vel, obs, c_action, t): return t % replanning_time == 0
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear' phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear'
@ -134,18 +135,20 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra
# Make 3 episodes, total steps depend on the replanning steps # Make 3 episodes, total steps depend on the replanning steps
for i in range(3 * episode_steps): for i in range(3 * episode_steps):
action = env.action_space.sample() action = env.action_space.sample()
obs, r, d, info = env.step(action) _obs, _reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
length = info['trajectory_length'] length = info['trajectory_length']
if d: if done:
# Check if number of steps until termination match the replanning interval # Check if number of steps until termination match the replanning interval
print(d, (i + 1), episode_steps) print(done, (i + 1), episode_steps)
assert (i + 1) % episode_steps == 0 assert (i + 1) % episode_steps == 0
env.reset() env.reset()
assert replanning_schedule(None, None, None, None, length) assert replanning_schedule(None, None, None, None, length)
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10]) @pytest.mark.parametrize('sub_segment_steps', [5, 10])
@ -167,13 +170,16 @@ def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_s
}, },
seed=SEED) seed=SEED)
_ = env.reset() _ = env.reset()
d = False done = False
planning_times = 0 planning_times = 0
while not d: while not done:
_, _, d, _ = env.step(env.action_space.sample()) action = env.action_space.sample()
_obs, _reward, terminated, truncated, _info = env.step(action)
done = terminated or truncated
planning_times += 1 planning_times += 1
assert planning_times == max_planning_times assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10]) @pytest.mark.parametrize('sub_segment_steps', [5, 10])
@ -196,15 +202,17 @@ def test_replanning_with_learn_tau(mp_type: str, max_planning_times: int, sub_se
}, },
seed=SEED) seed=SEED)
_ = env.reset() _ = env.reset()
d = False done = False
planning_times = 0 planning_times = 0
while not d: while not done:
action = env.action_space.sample() action = env.action_space.sample()
action[0] = tau action[0] = tau
_, _, d, info = env.step(action) _obs, _reward, terminated, truncated, _info = env.step(action)
done = terminated or truncated
planning_times += 1 planning_times += 1
assert planning_times == max_planning_times assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10]) @pytest.mark.parametrize('sub_segment_steps', [5, 10])
@ -213,26 +221,27 @@ def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times, 'max_planning_times': max_planning_times,
'verbose': 2}, 'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type, {'phase_generator_type': phase_generator_type,
'learn_tau': False, 'learn_tau': False,
'learn_delay': True 'learn_delay': True
}, },
{'basis_generator_type': basis_generator_type, {'basis_generator_type': basis_generator_type,
}, },
seed=SEED) seed=SEED)
_ = env.reset() _ = env.reset()
d = False done = False
planning_times = 0 planning_times = 0
while not d: while not done:
action = env.action_space.sample() action = env.action_space.sample()
action[0] = delay action[0] = delay
_, _, d, info = env.step(action) _obs, _reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
delay_time_steps = int(np.round(delay / env.dt)) delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten() pos = info['positions'].flatten()
@ -256,6 +265,7 @@ def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_
assert planning_times == max_planning_times assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3]) @pytest.mark.parametrize('max_planning_times', [1, 2, 3])
@pytest.mark.parametrize('sub_segment_steps', [5, 10, 15]) @pytest.mark.parametrize('sub_segment_steps', [5, 10, 15])
@ -266,27 +276,28 @@ def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: i
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'max_planning_times': max_planning_times, 'max_planning_times': max_planning_times,
'verbose': 2}, 'verbose': 2},
{'trajectory_generator_type': mp_type, {'trajectory_generator_type': mp_type,
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type, {'phase_generator_type': phase_generator_type,
'learn_tau': True, 'learn_tau': True,
'learn_delay': True 'learn_delay': True
}, },
{'basis_generator_type': basis_generator_type, {'basis_generator_type': basis_generator_type,
}, },
seed=SEED) seed=SEED)
_ = env.reset() _ = env.reset()
d = False done = False
planning_times = 0 planning_times = 0
while not d: while not done:
action = env.action_space.sample() action = env.action_space.sample()
action[0] = tau action[0] = tau
action[1] = delay action[1] = delay
_, _, d, info = env.step(action) _obs, _reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
delay_time_steps = int(np.round(delay / env.dt)) delay_time_steps = int(np.round(delay / env.dt))
@ -306,6 +317,7 @@ def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: i
assert planning_times == max_planning_times assert planning_times == max_planning_times
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) @pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10]) @pytest.mark.parametrize('sub_segment_steps', [5, 10])
@ -327,7 +339,8 @@ def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_
}, },
seed=SEED) seed=SEED)
_ = env.reset() _ = env.reset()
d = False
for i in range(max_planning_times): for i in range(max_planning_times):
_, _, d, _ = env.step(env.action_space.sample()) action = env.action_space.sample()
assert d _obs, _reward, terminated, truncated, _info = env.step(action)
done = terminated or truncated
assert done