Merge pull request #43 from ALRhub/35-replace-native-unittest-with-pytest

35 replace native unittest with pytest
This commit is contained in:
ottofabian 2022-09-26 09:57:13 +02:00 committed by GitHub
commit ee4a46fad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 267 additions and 512 deletions

View File

@ -50,7 +50,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.tracking_controller = tracking_controller self.tracking_controller = tracking_controller
# self.time_steps = np.linspace(0, self.duration, self.traj_steps) # self.time_steps = np.linspace(0, self.duration, self.traj_steps)
# self.traj_gen.set_mp_times(self.time_steps) # self.traj_gen.set_mp_times(self.time_steps)
self.traj_gen.set_duration(self.duration - self.dt, self.dt) self.traj_gen.set_duration(self.duration, self.dt)
# reward computation # reward computation
self.reward_aggregation = reward_aggregation self.reward_aggregation = reward_aggregation
@ -85,9 +85,10 @@ class BlackBoxWrapper(gym.ObservationWrapper):
trajectory = get_numpy(self.traj_gen.get_traj_pos()) trajectory = get_numpy(self.traj_gen.get_traj_pos())
velocity = get_numpy(self.traj_gen.get_traj_vel()) velocity = get_numpy(self.traj_gen.get_traj_vel())
# Remove first element of trajectory as this is the current position and velocity if self.do_replanning:
# trajectory = trajectory[1:] # Remove first part of trajectory as this is already over
# velocity = velocity[1:] trajectory = trajectory[self.current_traj_steps:]
velocity = velocity[self.current_traj_steps:]
return trajectory, velocity return trajectory, velocity

View File

@ -106,31 +106,31 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
""" """
base_env_id = "HoleReacher-v0" base_env_id = "Reacher5d-v0"
# Replace this wrapper with the custom wrapper for your environment by inheriting from the RawInterfaceWrapper. # Replace this wrapper with the custom wrapper for your environment by inheriting from the RawInterfaceWrapper.
# You can also add other gym.Wrappers in case they are needed. # You can also add other gym.Wrappers in case they are needed.
wrappers = [fancy_gym.envs.classic_control.hole_reacher.MPWrapper] wrappers = [fancy_gym.envs.mujoco.reacher.MPWrapper]
# # For a ProMP # For a ProMP
# trajectory_generator_kwargs = {'trajectory_generator_type': 'promp', trajectory_generator_kwargs = {'trajectory_generator_type': 'promp',
# 'weight_scale': 2} 'weight_scale': 2}
# phase_generator_kwargs = {'phase_generator_type': 'linear'} phase_generator_kwargs = {'phase_generator_type': 'linear'}
# controller_kwargs = {'controller_type': 'velocity'}
# basis_generator_kwargs = {'basis_generator_type': 'zero_rbf',
# 'num_basis': 5,
# 'num_basis_zero_start': 1
# }
# For a DMP
trajectory_generator_kwargs = {'trajectory_generator_type': 'dmp',
'weight_scale': 500}
phase_generator_kwargs = {'phase_generator_type': 'exp',
'alpha_phase': 2.5}
controller_kwargs = {'controller_type': 'velocity'} controller_kwargs = {'controller_type': 'velocity'}
basis_generator_kwargs = {'basis_generator_type': 'rbf', basis_generator_kwargs = {'basis_generator_type': 'zero_rbf',
'num_basis': 5 'num_basis': 5,
'num_basis_zero_start': 1
} }
# # For a DMP
# trajectory_generator_kwargs = {'trajectory_generator_type': 'dmp',
# 'weight_scale': 500}
# phase_generator_kwargs = {'phase_generator_type': 'exp',
# 'alpha_phase': 2.5}
# controller_kwargs = {'controller_type': 'velocity'}
# basis_generator_kwargs = {'basis_generator_type': 'rbf',
# 'num_basis': 5
# }
env = fancy_gym.make_bb(env_id=base_env_id, wrappers=wrappers, black_box_kwargs={}, env = fancy_gym.make_bb(env_id=base_env_id, wrappers=wrappers, black_box_kwargs={},
traj_gen_kwargs=trajectory_generator_kwargs, controller_kwargs=controller_kwargs, traj_gen_kwargs=trajectory_generator_kwargs, controller_kwargs=controller_kwargs,
phase_kwargs=phase_generator_kwargs, basis_kwargs=basis_generator_kwargs, phase_kwargs=phase_generator_kwargs, basis_kwargs=basis_generator_kwargs,
@ -155,15 +155,15 @@ def example_fully_custom_mp(seed=1, iterations=1, render=True):
if __name__ == '__main__': if __name__ == '__main__':
render = True render = False
# DMP # DMP
example_mp("HoleReacherDMP-v0", seed=10, iterations=5, render=render) example_mp("HoleReacherDMP-v0", seed=10, iterations=5, render=render)
#
# # ProMP # ProMP
example_mp("HoleReacherProMP-v0", seed=10, iterations=5, render=render) example_mp("HoleReacherProMP-v0", seed=10, iterations=5, render=render)
# Altered basis functions # Altered basis functions
obs1 = example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=5, render=render) obs1 = example_custom_mp("Reacher5dProMP-v0", seed=10, iterations=1, render=render)
# Custom MP # Custom MP
example_fully_custom_mp(seed=10, iterations=1, render=render) example_fully_custom_mp(seed=10, iterations=1, render=render)

View File

@ -1,130 +0,0 @@
import unittest
import gym
import numpy as np
from dm_control import suite, manipulation
import fancy_gym
from fancy_gym import make
SUITE_IDS = [f'dmc:{env}-{task}' for env, task in suite.ALL_TASKS if env != "lqr"]
MANIPULATION_IDS = [f'dmc:manipulation-{task}' for task in manipulation.ALL if task.endswith('_features')]
SEED = 1
class TestDMCEnvironments(unittest.TestCase):
def _run_env(self, env_id, iterations=None, seed=SEED, render=False):
"""
Example for running a DMC based env in the step based setting.
The env_id has to be specified as `dmc:domain_name-task_name` or
for manipulation tasks as `manipulation-environment_name`
Args:
env_id: Either `dmc:domain_name-task_name` or `dmc:manipulation-environment_name`
iterations: Number of rollout steps to run
seed: random seeding
render: Render the episode
Returns: observations, rewards, dones, actions
"""
env: gym.Env = make(env_id, seed=seed)
rewards = []
observations = []
actions = []
dones = []
obs = env.reset()
self._verify_observations(obs, env.observation_space, "reset()")
iterations = iterations or (env.spec.max_episode_steps or 1)
# number of samples(multiple environment steps)
for i in range(iterations):
observations.append(obs)
ac = env.action_space.sample()
actions.append(ac)
# ac = np.random.uniform(env.action_space.low, env.action_space.high, env.action_space.shape)
obs, reward, done, info = env.step(ac)
self._verify_observations(obs, env.observation_space, "step()")
self._verify_reward(reward)
self._verify_done(done)
rewards.append(reward)
dones.append(done)
if render:
env.render("human")
if done:
break
assert done, "Done flag is not True after end of episode."
observations.append(obs)
env.close()
del env
return np.array(observations), np.array(rewards), np.array(dones), np.array(actions)
def _run_env_determinism(self, ids):
seed = 0
for env_id in ids:
with self.subTest(msg=env_id):
traj1 = self._run_env(env_id, seed=seed)
traj2 = self._run_env(env_id, seed=seed)
for i, time_step in enumerate(zip(*traj1, *traj2)):
obs1, rwd1, done1, ac1, obs2, rwd2, done2, ac2 = time_step
self.assertTrue(np.array_equal(obs1, obs2), f"Observations [{i}] delta {obs1 - obs2} is not zero.")
self.assertTrue(np.array_equal(ac1, ac2), f"Actions [{i}] delta {ac1 - ac2} is not zero.")
self.assertEqual(done1, done2, f"Dones [{i}] {done1} and {done2} do not match.")
self.assertEqual(rwd1, rwd2, f"Rewards [{i}] {rwd1} and {rwd2} do not match.")
def _verify_observations(self, obs, observation_space, obs_type="reset()"):
self.assertTrue(observation_space.contains(obs),
f"Observation {obs} received from {obs_type} "
f"not contained in observation space {observation_space}.")
def _verify_reward(self, reward):
self.assertIsInstance(reward, (float, int), f"Returned type {type(reward)} as reward, expected float or int.")
def _verify_done(self, done):
self.assertIsInstance(done, bool, f"Returned {done} as done flag, expected bool.")
def test_suite_functionality(self):
"""Tests that suite step environments run without errors using random actions."""
for env_id in SUITE_IDS:
with self.subTest(msg=env_id):
self._run_env(env_id)
def test_suite_determinism(self):
"""Tests that for step environments identical seeds produce identical trajectories."""
self._run_env_determinism(SUITE_IDS)
def test_manipulation_functionality(self):
"""Tests that manipulation step environments run without errors using random actions."""
for env_id in MANIPULATION_IDS:
with self.subTest(msg=env_id):
self._run_env(env_id)
def test_manipulation_determinism(self):
"""Tests that for step environments identical seeds produce identical trajectories."""
self._run_env_determinism(MANIPULATION_IDS)
def test_bb_functionality(self):
"""Tests that black box environments run without errors using random actions."""
for traj_gen, env_ids in fancy_gym.ALL_DMC_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
for id in env_ids:
with self.subTest(msg=id):
self._run_env(id)
def test_bb_determinism(self):
"""Tests that for black box environment identical seeds produce identical trajectories."""
for traj_gen, env_ids in fancy_gym.ALL_DMC_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
self._run_env_determinism(env_ids)
if __name__ == '__main__':
unittest.main()

48
test/test_dmc_envs.py Normal file
View File

@ -0,0 +1,48 @@
from itertools import chain
import pytest
from dm_control import suite, manipulation
import fancy_gym
from test.utils import run_env, run_env_determinism
SUITE_IDS = [f'dmc:{env}-{task}' for env, task in suite.ALL_TASKS if env != "lqr"]
MANIPULATION_IDS = [f'dmc:manipulation-{task}' for task in manipulation.ALL if task.endswith('_features')]
DMC_MP_IDS = chain(*fancy_gym.ALL_DMC_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
SEED = 1
@pytest.mark.parametrize('env_id', SUITE_IDS)
def test_step_suite_functionality(env_id: str):
"""Tests that suite step environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', SUITE_IDS)
def test_step_suite_determinism(env_id: str):
"""Tests that for step environments identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)
@pytest.mark.parametrize('env_id', MANIPULATION_IDS)
def test_step_manipulation_functionality(env_id: str):
"""Tests that manipulation step environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', MANIPULATION_IDS)
def test_step_manipulation_determinism(env_id: str):
"""Tests that for step environments identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)
@pytest.mark.parametrize('env_id', DMC_MP_IDS)
def test_bb_dmc_functionality(env_id: str):
"""Tests that black box environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', DMC_MP_IDS)
def test_bb_dmc_determinism(env_id: str):
"""Tests that for black box environment identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)

View File

@ -1,118 +0,0 @@
import unittest
import gym
import numpy as np
import fancy_gym # noqa
from fancy_gym.utils.make_env_helpers import make
CUSTOM_IDS = [spec.id for spec in gym.envs.registry.all() if
"fancy_gym" in spec.entry_point and 'make_bb_env_helper' not in spec.entry_point]
SEED = 1
class TestCustomEnvironments(unittest.TestCase):
def _run_env(self, env_id, iterations=None, seed=SEED, render=False):
"""
Example for running a DMC based env in the step based setting.
The env_id has to be specified as `domain_name-task_name` or
for manipulation tasks as `manipulation-environment_name`
Args:
env_id: Either `domain_name-task_name` or `manipulation-environment_name`
iterations: Number of rollout steps to run
seed: random seeding
render: Render the episode
Returns: observations, rewards, dones, actions
"""
env: gym.Env = make(env_id, seed=seed)
rewards = []
actions = []
observations = []
dones = []
obs = env.reset()
self._verify_observations(obs, env.observation_space, "reset()")
iterations = iterations or (env.spec.max_episode_steps or 1)
# number of samples(multiple environment steps)
for i in range(iterations):
observations.append(obs)
ac = env.action_space.sample()
actions.append(ac)
obs, reward, done, info = env.step(ac)
self._verify_observations(obs, env.observation_space, "step()")
self._verify_reward(reward)
self._verify_done(done)
rewards.append(reward)
dones.append(done)
if render:
env.render("human")
if done:
break
assert done, "Done flag is not True after end of episode."
observations.append(obs)
env.close()
del env
return np.array(observations), np.array(rewards), np.array(dones), np.array(actions)
def _run_env_determinism(self, ids):
seed = 0
for env_id in ids:
with self.subTest(msg=env_id):
traj1 = self._run_env(env_id, seed=seed)
traj2 = self._run_env(env_id, seed=seed)
for i, time_step in enumerate(zip(*traj1, *traj2)):
obs1, rwd1, done1, ac1, obs2, rwd2, done2, ac2 = time_step
self.assertTrue(np.array_equal(ac1, ac2), f"Actions [{i}] delta {ac1 - ac2} is not zero.")
self.assertTrue(np.array_equal(obs1, obs2), f"Observations [{i}] delta {obs1 - obs2} is not zero.")
self.assertEqual(rwd1, rwd2, f"Rewards [{i}] {rwd1} and {rwd2} do not match.")
self.assertEqual(done1, done2, f"Dones [{i}] {done1} and {done2} do not match.")
def _verify_observations(self, obs, observation_space, obs_type="reset()"):
self.assertTrue(observation_space.contains(obs),
f"Observation {obs} received from {obs_type} "
f"not contained in observation space {observation_space}.")
def _verify_reward(self, reward):
self.assertIsInstance(reward, (float, int), f"Returned type {type(reward)} as reward, expected float or int.")
def _verify_done(self, done):
self.assertIsInstance(done, bool, f"Returned {done} as done flag, expected bool.")
def test_step_functionality(self):
"""Tests that step environments run without errors using random actions."""
for env_id in CUSTOM_IDS:
with self.subTest(msg=env_id):
self._run_env(env_id)
def test_step_determinism(self):
"""Tests that for step environments identical seeds produce identical trajectories."""
self._run_env_determinism(CUSTOM_IDS)
def test_bb_functionality(self):
"""Tests that black box environments run without errors using random actions."""
for traj_gen, env_ids in fancy_gym.ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
for id in env_ids:
with self.subTest(msg=id):
self._run_env(id)
def test_bb_determinism(self):
"""Tests that for black box environment identical seeds produce identical trajectories."""
for traj_gen, env_ids in fancy_gym.ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
self._run_env_determinism(env_ids)
if __name__ == '__main__':
unittest.main()

36
test/test_fancy_envs.py Normal file
View File

@ -0,0 +1,36 @@
import itertools
import fancy_gym
import gym
import pytest
from test.utils import run_env, run_env_determinism
CUSTOM_IDS = [spec.id for spec in gym.envs.registry.all() if
"fancy_gym" in spec.entry_point and 'make_bb_env_helper' not in spec.entry_point]
CUSTOM_MP_IDS = itertools.chain(*fancy_gym.ALL_FANCY_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
SEED = 1
@pytest.mark.parametrize('env_id', CUSTOM_IDS)
def test_step_fancy_functionality(env_id: str):
"""Tests that step environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', CUSTOM_IDS)
def test_step_fancy_determinism(env_id: str):
"""Tests that for step environments identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)
@pytest.mark.parametrize('env_id', CUSTOM_MP_IDS)
def test_bb_fancy_functionality(env_id: str):
"""Tests that black box environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', CUSTOM_MP_IDS)
def test_bb_fancy_determinism(env_id: str):
"""Tests that for black box environment identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)

View File

@ -1,118 +0,0 @@
import unittest
import gym
import numpy as np
import fancy_gym
from fancy_gym import make
GYM_IDS = [spec.id for spec in gym.envs.registry.all() if
"fancy_gym" not in spec.entry_point and 'make_bb_env_helper' not in spec.entry_point]
SEED = 1
class TestGymEnvironments(unittest.TestCase):
def _run_env(self, env_id, iterations=None, seed=SEED, render=False):
"""
Example for running a openai gym env in the step based setting.
The env_id has to be specified as `env_id-vX`.
Args:
env_id: env id in the form `env_id-vX`
iterations: Number of rollout steps to run
seed: random seeding
render: Render the episode
Returns:
"""
env: gym.Env = make(env_id, seed=seed)
rewards = []
observations = []
actions = []
dones = []
obs = env.reset()
self._verify_observations(obs, env.observation_space, "reset()")
iterations = iterations or (env.spec.max_episode_steps or 1)
# number of samples(multiple environment steps)
for i in range(iterations):
observations.append(obs)
ac = env.action_space.sample()
actions.append(ac)
# ac = np.random.uniform(env.action_space.low, env.action_space.high, env.action_space.shape)
obs, reward, done, info = env.step(ac)
self._verify_observations(obs, env.observation_space, "step()")
self._verify_reward(reward)
self._verify_done(done)
rewards.append(reward)
dones.append(done)
if render:
env.render("human")
if done:
break
assert done or env.spec.max_episode_steps is None, "Done flag is not True after end of episode."
observations.append(obs)
env.close()
del env
return np.array(observations), np.array(rewards), np.array(dones), np.array(actions)
def _run_env_determinism(self, ids):
seed = 0
for env_id in ids:
with self.subTest(msg=env_id):
traj1 = self._run_env(env_id, seed=seed)
traj2 = self._run_env(env_id, seed=seed)
for i, time_step in enumerate(zip(*traj1, *traj2)):
obs1, rwd1, done1, ac1, obs2, rwd2, done2, ac2 = time_step
self.assertTrue(np.array_equal(ac1, ac2), f"Actions [{i}] delta {ac1 - ac2} is not zero.")
self.assertTrue(np.array_equal(obs1, obs2), f"Observations [{i}] delta {obs1 - obs2} is not zero.")
self.assertEqual(rwd1, rwd2, f"Rewards [{i}] {rwd1} and {rwd2} do not match.")
self.assertEqual(done1, done2, f"Dones [{i}] {done1} and {done2} do not match.")
def _verify_observations(self, obs, observation_space, obs_type="reset()"):
self.assertTrue(observation_space.contains(obs),
f"Observation {obs} received from {obs_type} "
f"not contained in observation space {observation_space}.")
def _verify_reward(self, reward):
self.assertIsInstance(reward, (float, int), f"Returned type {type(reward)} as reward, expected float or int.")
def _verify_done(self, done):
self.assertIsInstance(done, bool, f"Returned {done} as done flag, expected bool.")
def test_step_functionality(self):
"""Tests that step environments run without errors using random actions."""
for env_id in GYM_IDS:
with self.subTest(msg=env_id):
self._run_env(env_id)
def test_step_determinism(self):
"""Tests that for step environments identical seeds produce identical trajectories."""
self._run_env_determinism(GYM_IDS)
def test_bb_functionality(self):
"""Tests that black box environments run without errors using random actions."""
for traj_gen, env_ids in fancy_gym.ALL_GYM_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
for id in env_ids:
with self.subTest(msg=id):
self._run_env(id)
def test_bb_determinism(self):
"""Tests that for black box environment identical seeds produce identical trajectories."""
for traj_gen, env_ids in fancy_gym.ALL_GYM_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
self._run_env_determinism(env_ids)
if __name__ == '__main__':
unittest.main()

36
test/test_gym_envs.py Normal file
View File

@ -0,0 +1,36 @@
from itertools import chain
import gym
import pytest
import fancy_gym
from test.utils import run_env, run_env_determinism
GYM_IDS = [spec.id for spec in gym.envs.registry.all() if
"fancy_gym" not in spec.entry_point and 'make_bb_env_helper' not in spec.entry_point]
GYM_MP_IDS = chain(*fancy_gym.ALL_DMC_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
SEED = 1
@pytest.mark.parametrize('env_id', GYM_IDS)
def test_step_gym_functionality(env_id: str):
"""Tests that step environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', GYM_IDS)
def test_step_gym_determinism(env_id: str):
"""Tests that for step environments identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)
@pytest.mark.parametrize('env_id', GYM_MP_IDS)
def test_bb_gym_functionality(env_id: str):
"""Tests that black box environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', GYM_MP_IDS)
def test_bb_gym_determinism(env_id: str):
"""Tests that for black box environment identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)

View File

@ -1,119 +0,0 @@
import unittest
import gym
import numpy as np
from metaworld.envs import ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE
import fancy_gym
from fancy_gym import make
METAWORLD_IDS = [f'metaworld:{env.split("-goal-observable")[0]}' for env, _ in
ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE.items()]
SEED = 1
class TestMetaWorldEnvironments(unittest.TestCase):
def _run_env(self, env_id, iterations=None, seed=SEED, render=False):
"""
Example for running a metaworld based env in the step based setting.
The env_id has to be specified as `metaworld:env_id-vX`.
Args:
env_id: env id in the form `metaworld:env_id-vX`
iterations: Number of rollout steps to run
seed: random seeding
render: Render the episode
Returns:
"""
env: gym.Env = make(env_id, seed=seed)
rewards = []
observations = []
actions = []
dones = []
obs = env.reset()
self._verify_observations(obs, env.observation_space, "reset()")
iterations = iterations or (env.spec.max_episode_steps or 1)
# number of samples(multiple environment steps)
for i in range(iterations):
observations.append(obs)
ac = env.action_space.sample()
actions.append(ac)
# ac = np.random.uniform(env.action_space.low, env.action_space.high, env.action_space.shape)
obs, reward, done, info = env.step(ac)
self._verify_observations(obs, env.observation_space, "step()")
self._verify_reward(reward)
self._verify_done(done)
rewards.append(reward)
dones.append(done)
if render:
env.render("human")
if done:
break
assert done, "Done flag is not True after end of episode."
observations.append(obs)
env.close()
del env
return np.array(observations), np.array(rewards), np.array(dones), np.array(actions)
def _run_env_determinism(self, ids):
seed = 0
for env_id in ids:
with self.subTest(msg=env_id):
traj1 = self._run_env(env_id, seed=seed)
traj2 = self._run_env(env_id, seed=seed)
for i, time_step in enumerate(zip(*traj1, *traj2)):
obs1, rwd1, done1, ac1, obs2, rwd2, done2, ac2 = time_step
self.assertTrue(np.array_equal(ac1, ac2), f"Actions [{i}] delta {ac1 - ac2} is not zero.")
self.assertTrue(np.array_equal(obs1, obs2), f"Observations [{i}] delta {obs1 - obs2} is not zero.")
self.assertEqual(rwd1, rwd2, f"Rewards [{i}] {rwd1} and {rwd2} do not match.")
self.assertEqual(done1, done2, f"Dones [{i}] {done1} and {done2} do not match.")
def _verify_observations(self, obs, observation_space, obs_type="reset()"):
self.assertTrue(observation_space.contains(obs),
f"Observation {obs} received from {obs_type} "
f"not contained in observation space {observation_space}.")
def _verify_reward(self, reward):
self.assertIsInstance(reward, (float, int), f"Returned type {type(reward)} as reward, expected float or int.")
def _verify_done(self, done):
self.assertIsInstance(done, bool, f"Returned {done} as done flag, expected bool.")
def test_step_functionality(self):
"""Tests that step environments run without errors using random actions."""
for env_id in METAWORLD_IDS:
with self.subTest(msg=env_id):
self._run_env(env_id)
def test_step_determinism(self):
"""Tests that for step environments identical seeds produce identical trajectories."""
self._run_env_determinism(METAWORLD_IDS)
def test_bb_functionality(self):
"""Tests that black box environments run without errors using random actions."""
for traj_gen, env_ids in fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
for id in env_ids:
with self.subTest(msg=id):
self._run_env(id)
def test_bb_determinism(self):
"""Tests that for black box environment identical seeds produce identical trajectories."""
for traj_gen, env_ids in fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.items():
with self.subTest(msg=traj_gen):
self._run_env_determinism(env_ids)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,37 @@
from itertools import chain
import pytest
from metaworld.envs import ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE
import fancy_gym
from test.utils import run_env, run_env_determinism
METAWORLD_IDS = [f'metaworld:{env.split("-goal-observable")[0]}' for env, _ in
ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE.items()]
METAWORLD_MP_IDS = chain(*fancy_gym.ALL_METAWORLD_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
print(METAWORLD_MP_IDS)
SEED = 1
@pytest.mark.parametrize('env_id', METAWORLD_IDS)
def test_step_metaworld_functionality(env_id: str):
"""Tests that step environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', METAWORLD_IDS)
def test_step_metaworld_determinism(env_id: str):
"""Tests that for step environments identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)
@pytest.mark.parametrize('env_id', METAWORLD_MP_IDS)
def test_bb_metaworld_functionality(env_id: str):
"""Tests that black box environments run without errors using random actions."""
run_env(env_id)
@pytest.mark.parametrize('env_id', METAWORLD_MP_IDS)
def test_bb_metaworld_determinism(env_id: str):
"""Tests that for black box environment identical seeds produce identical trajectories."""
run_env_determinism(env_id, SEED)

82
test/utils.py Normal file
View File

@ -0,0 +1,82 @@
import gym
import numpy as np
from fancy_gym import make
def run_env(env_id, iterations=None, seed=0, render=False):
"""
Example for running a DMC based env in the step based setting.
The env_id has to be specified as `dmc:domain_name-task_name` or
for manipulation tasks as `manipulation-environment_name`
Args:
env_id: Either `dmc:domain_name-task_name` or `dmc:manipulation-environment_name`
iterations: Number of rollout steps to run
seed: random seeding
render: Render the episode
Returns: observations, rewards, dones, actions
"""
env: gym.Env = make(env_id, seed=seed)
rewards = []
observations = []
actions = []
dones = []
obs = env.reset()
verify_observations(obs, env.observation_space, "reset()")
iterations = iterations or (env.spec.max_episode_steps or 1)
# number of samples(multiple environment steps)
for i in range(iterations):
observations.append(obs)
ac = env.action_space.sample()
actions.append(ac)
# ac = np.random.uniform(env.action_space.low, env.action_space.high, env.action_space.shape)
obs, reward, done, info = env.step(ac)
verify_observations(obs, env.observation_space, "step()")
verify_reward(reward)
verify_done(done)
rewards.append(reward)
dones.append(done)
if render:
env.render("human")
if done:
break
assert done, "Done flag is not True after end of episode."
observations.append(obs)
env.close()
del env
return np.array(observations), np.array(rewards), np.array(dones), np.array(actions)
def run_env_determinism(env_id: str, seed: int):
traj1 = run_env(env_id, seed=seed)
traj2 = run_env(env_id, seed=seed)
# Iterate over two trajectories, which should have the same state and action sequence
for i, time_step in enumerate(zip(*traj1, *traj2)):
obs1, rwd1, done1, ac1, obs2, rwd2, done2, ac2 = time_step
assert np.array_equal(obs1, obs2), f"Observations [{i}] {obs1} and {obs2} do not match."
assert np.array_equal(ac1, ac2), f"Actions [{i}] {ac1} and {ac2} do not match."
assert np.array_equal(rwd1, rwd2), f"Rewards [{i}] {rwd1} and {rwd2} do not match."
assert np.array_equal(done1, done2), f"Dones [{i}] {done1} and {done2} do not match."
def verify_observations(obs, observation_space: gym.Space, obs_type="reset()"):
assert observation_space.contains(obs), \
f"Observation {obs} received from {obs_type} not contained in observation space {observation_space}."
def verify_reward(reward):
assert isinstance(reward, (float, int)), f"Returned type {type(reward)} as reward, expected float or int."
def verify_done(done):
assert isinstance(done, bool), f"Returned {done} as done flag, expected bool."