Merge remote-tracking branch 'origin/44-added-tests-for-the-black-box-wrapper' into Add-ProDMP-envs
This commit is contained in:
@ -1,4 +1,4 @@
from typing import Tuple, Optional
from typing import Tuple, Optional, Callable
import gym
import numpy as np
@ -19,8 +19,9 @@ class BlackBoxWrapper(gym.ObservationWrapper):
duration: float,
verbose: int = 1,
learn_sub_trajectories: bool = False,
replanning_schedule: Optional[callable] = None,
reward_aggregation: callable = np.sum
replanning_schedule: Optional[
Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None,
reward_aggregation: Callable[[np.ndarray], float] = np.sum
gym.Wrapper for leveraging a black box approach with a trajectory generator.
@ -73,24 +74,30 @@ class BlackBoxWrapper(gym.ObservationWrapper):
return observation.astype(self.observation_space.dtype)
def get_trajectory(self, action: np.ndarray) -> Tuple:
duration = self.duration
if self.learn_sub_trajectories:
duration = None
# reset with every new call as we need to set all arguments, such as tau, delay, again.
# If we do not do this, the traj_gen assumes we are continuing the trajectory.
clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
# TODO we could think about initializing with the previous desired value in order to have a smooth transition
# at least from the planning point of view.
self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
duration = None if self.learn_sub_trajectories else self.duration
self.traj_gen.set_duration(duration, self.dt)
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
trajectory = get_numpy(self.traj_gen.get_traj_pos())
position = get_numpy(self.traj_gen.get_traj_pos())
velocity = get_numpy(self.traj_gen.get_traj_vel())
if self.do_replanning:
# Remove first part of trajectory as this is already over
trajectory = trajectory[self.current_traj_steps:]
velocity = velocity[self.current_traj_steps:]
# if self.do_replanning:
# # Remove first part of trajectory as this is already over
# position = position[self.current_traj_steps:]
# velocity = velocity[self.current_traj_steps:]
return trajectory, velocity
return position, velocity
def _get_traj_gen_action_space(self):
"""This function can be used to set up an individual space for the parameters of the traj_gen."""
@ -125,9 +132,9 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# TODO remove this part, right now only needed for beer pong
mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen)
trajectory, velocity = self.get_trajectory(mp_params)
position, velocity = self.get_trajectory(mp_params)
trajectory_length = len(trajectory)
trajectory_length = len(position)
rewards = np.zeros(shape=(trajectory_length,))
if self.verbose >= 2:
actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape)
@ -137,7 +144,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
infos = dict()
done = False
for t, (pos, vel) in enumerate(zip(trajectory, velocity)):
for t, (pos, vel) in enumerate(zip(position, velocity)):
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
obs, c_reward, done, info = self.env.step(c_action)
@ -163,7 +170,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.current_traj_steps += t + 1
if self.verbose >= 2:
infos['positions'] = trajectory
infos['positions'] = position
infos['velocities'] = velocity
infos['step_actions'] = actions[:t + 1]
infos['step_observations'] = observations[:t + 1]
@ -180,4 +187,5 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
self.current_traj_steps = 0
return super(BlackBoxWrapper, self).reset()
@ -32,7 +32,7 @@ class MPWrapper(BaseMetaworldMPWrapper):
# Current observation
[False] * 3, # end-effector position
[False] * 1, # normalized gripper open distance
[False] * 3, # main object position
[True] * 3, # main object position
[False] * 4, # main object quaternion
[False] * 3, # secondary object position
[False] * 4, # secondary object quaternion
@ -166,6 +166,15 @@ def make_bb(
# We have to learn the length when learning sub_trajectories trajectories
phase_kwargs['learn_tau'] = True
# set tau bounds to minimum of two env steps otherwise computing the velocity is not possible.
# maximum is full duration of one episode.
if phase_kwargs.get('learn_tau'):
phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']]
# Max delay is full duration minus two steps due to above reason
if phase_kwargs.get('learn_delay'):
phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2]
phase_gen = get_phase_generator(**phase_kwargs)
basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs)
controller = get_controller(**controller_kwargs)
@ -1,5 +1,5 @@
from itertools import chain
from typing import Tuple, Type, Union, Optional
from typing import Tuple, Type, Union, Optional, Callable
import gym
import numpy as np
@ -25,7 +25,7 @@ class Object(object):
class ToyEnv(gym.Env):
observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
dt = 0.01
dt = 0.02
def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()):
self.a, self.b, self.c, self.d, self.e = a, b, c, d, e
@ -49,7 +49,7 @@ class ToyWrapper(RawInterfaceWrapper):
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return np.ones(self.action_space.shape)
return np.zeros(self.action_space.shape)
@pytest.fixture(scope="session", autouse=True)
@ -67,74 +67,81 @@ def test_missing_wrapper(env_id: str):
fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {})
def test_missing_local_state():
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
def test_missing_local_state(mp_type: str):
env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {},
{'trajectory_generator_type': 'promp'},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'})
with pytest.raises(NotImplementedError):
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('verbose', [1, 2])
def test_verbosity(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int):
def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int):
env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': 'promp'},
env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'})
info_keys = env.step(env.action_space.sample())[3].keys()
info_keys = list(env.step(env.action_space.sample())[3].keys())
env_step = fancy_gym.make(env_id, SEED)
info_keys_step = env_step.step(env_step.action_space.sample())[3].keys()
assert info_keys_step in info_keys
assert all(e in info_keys for e in info_keys_step)
assert 'trajectory_length' in info_keys
if verbose >= 2:
mp_keys = ['position', 'velocities', 'step_actions', 'step_observations', 'step_rewards']
assert mp_keys in info_keys
mp_keys = ['positions', 'velocities', 'step_actions', 'step_observations', 'step_rewards']
assert all(e in info_keys for e in mp_keys)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
def test_length(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': 'promp'},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'})
for _ in range(5):
length = env.step(env.action_space.sample())[3]['trajectory_length']
assert length == env.spec.max_episode_steps
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])])
def test_aggregation(reward_aggregation: callable):
def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation},
{'trajectory_generator_type': 'promp'},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'})
# ToyEnv only returns 1 as reward
assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, ))
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': 'promp'},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'})
# check if observation space matches with the specified mask values which are true
env_step = fancy_gym.make(env_id, SEED)
@ -142,36 +149,42 @@ def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('num_dof', [0, 1, 2, 5])
@pytest.mark.parametrize('num_basis', [0, 1, 2, 5])
@pytest.mark.parametrize('learn_tau', [True, False])
@pytest.mark.parametrize('learn_delay', [True, False])
def test_action_space(num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool):
def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': 'promp',
{'trajectory_generator_type': mp_type,
'action_dim': num_dof
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
{'phase_generator_type': 'exp',
'learn_tau': learn_tau,
'learn_delay': learn_delay
{'basis_generator_type': 'rbf',
'num_basis': num_basis
assert env.action_space.shape[0] == num_dof * num_basis + int(learn_tau) + int(learn_delay)
base_dims = num_dof * num_basis
additional_dims = num_dof if mp_type == 'dmp' else 0
traj_modification_dims = int(learn_tau) + int(learn_delay)
assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('a', [1])
@pytest.mark.parametrize('b', [1.0])
@pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]])
@pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}])
@pytest.mark.parametrize('e', [Object()])
def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object):
def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': 'promp'},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'},
a=a, b=b, c=c, d=d, e=e
@ -183,33 +196,134 @@ def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object):
assert e is env.e
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
def test_learn_sub_trajectories(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], add_time_aware_wrapper_before: bool):
env_id, wrapper_class = env_wrap
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
wrappers = [wrapper_class]
# has time aware wrapper
if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation]
env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True},
{'trajectory_generator_type': 'promp'},
@pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
def test_learn_tau(mp_type: str, tau: float):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
{'controller_type': 'motor'},
{'phase_generator_type': 'linear'},
{'basis_generator_type': 'rbf'})
assert env.learn_sub_trajectories
assert env.traj_gen.learn_tau
assert env.observation_space == env_step.observation_space
{'phase_generator_type': 'linear',
'learn_tau': True,
'learn_delay': False
{'basis_generator_type': 'rbf',
}, seed=SEED)
d = True
for i in range(5):
if d:
action = env.action_space.sample()
action[0] = tau
obs, r, d, info = env.step(action)
length = info['trajectory_length']
assert length == env.spec.max_episode_steps
factor = 1 / env.dt
assert np.allclose(length * env.dt, np.round(factor * action[0]) / factor)
assert np.allclose(length * env.dt, np.round(factor * env.traj_gen.tau.numpy()) / factor)
tau_time_steps = int(np.round(tau / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis)
assert np.all(pos[tau_time_steps:] == pos[-1])
assert np.all(vel[tau_time_steps:] == vel[-1])
# Check active trajectory section is different to end values
assert np.all(pos[:tau_time_steps - 1] != pos[-1])
assert np.all(vel[:tau_time_steps - 2] != vel[-1])
@pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
def test_learn_delay(mp_type: str, delay: float):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
'learn_tau': False,
'learn_delay': True
{'basis_generator_type': 'rbf',
}, seed=SEED)
d = True
for i in range(5):
if d:
action = env.action_space.sample()
action[0] = delay
obs, r, d, info = env.step(action)
length = info['trajectory_length']
assert length == env.spec.max_episode_steps
delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check beginning is all same (only true for linear basis)
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
@pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
'learn_tau': True,
'learn_delay': True
{'basis_generator_type': 'rbf',
}, seed=SEED)
if env.spec.max_episode_steps * env.dt < delay + tau:
d = True
for i in range(5):
if d:
action = env.action_space.sample()
action[0] = tau
action[1] = delay
obs, r, d, info = env.step(action)
length = info['trajectory_length']
assert length == env.spec.max_episode_steps
tau_time_steps = int(np.round(tau / env.dt))
delay_time_steps = int(np.round(delay / env.dt))
joint_time_steps = delay_time_steps + tau_time_steps
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis)
assert np.all(pos[joint_time_steps:] == pos[-1])
assert np.all(vel[joint_time_steps:] == vel[-1])
# Check beginning is all same (only true for linear basis)
assert np.all(pos[:delay_time_steps - 1] == pos[0])
assert np.all(vel[:delay_time_steps - 2] == vel[0])
# Check active trajectory section is different to beginning and end values
active_pos = pos[delay_time_steps: joint_time_steps - 1]
active_vel = vel[delay_time_steps: joint_time_steps - 2]
assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0])
assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0])
Normal file
Normal file
@ -0,0 +1,144 @@
from itertools import chain
from types import FunctionType
from typing import Tuple, Type, Union, Optional
import gym
import numpy as np
import pytest
from gym import register
from gym.core import ActType, ObsType
import fancy_gym
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
from fancy_gym.utils.time_aware_observation import TimeAwareObservation
SEED = 1
ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2']
WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper,
fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper]
class ToyEnv(gym.Env):
observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
dt = 0.02
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]:
return np.array([-1])
def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
return np.array([-1]), 1, False, {}
def render(self, mode="human"):
class ToyWrapper(RawInterfaceWrapper):
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return np.ones(self.action_space.shape)
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return np.zeros(self.action_space.shape)
@pytest.fixture(scope="session", autouse=True)
def setup():
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
add_time_aware_wrapper_before: bool):
env_id, wrapper_class = env_wrap
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
wrappers = [wrapper_class]
# has time aware wrapper
if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation]
env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True, 'verbose': 2},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, seed=SEED)
assert env.learn_sub_trajectories
assert env.traj_gen.learn_tau
# This also verifies we are not adding the TimeAwareObservationWrapper twice
assert env.observation_space == env_step.observation_space
d = True
for i in range(25):
if d:
action = env.action_space.sample()
obs, r, d, info = env.step(action)
length = info['trajectory_length']
if not d:
assert length == np.round(action[0] / env.dt)
assert length == np.round(env.traj_gen.tau.numpy() / env.dt)
# When done trajectory could be shorter due to termination.
assert length <= np.round(action[0] / env.dt)
assert length <= np.round(env.traj_gen.tau.numpy() / env.dt)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
@pytest.mark.parametrize('replanning_time', [10, 100, 1000])
def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
add_time_aware_wrapper_before: bool, replanning_time: int):
env_id, wrapper_class = env_wrap
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
wrappers = [wrapper_class]
# has time aware wrapper
if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation]
replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0
env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, seed=SEED)
assert env.do_replanning
assert callable(env.replanning_schedule)
# This also verifies we are not adding the TimeAwareObservationWrapper twice
assert env.observation_space == env_step.observation_space
episode_steps = env_step.spec.max_episode_steps // replanning_time
# Make 3 episodes, total steps depend on the replanning steps
for i in range(3 * episode_steps):
action = env.action_space.sample()
obs, r, d, info = env.step(action)
length = info['trajectory_length']
if d:
# Check if number of steps until termination match the replanning interval
print(d, (i + 1), episode_steps)
assert (i + 1) % episode_steps == 0
assert replanning_schedule(None, None, None, None, length)
Reference in New Issue
Block a user