From 6e4be06ffd46f7c2ee736032080ba88406c10698 Mon Sep 17 00:00:00 2001 From: Fabian Date: Mon, 26 Sep 2022 16:11:41 +0200 Subject: [PATCH 1/4] Added more tests --- fancy_gym/utils/make_env_helpers.py | 2 +- test/test_black_box.py | 104 ++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 test/test_black_box.py diff --git a/fancy_gym/utils/make_env_helpers.py b/fancy_gym/utils/make_env_helpers.py index 5221423..18ab6ed 100644 --- a/fancy_gym/utils/make_env_helpers.py +++ b/fancy_gym/utils/make_env_helpers.py @@ -141,7 +141,7 @@ def make_bb( Returns: DMP wrapped gym env """ - _verify_time_limit(traj_gen_kwargs.get("duration", None), kwargs.get("time_limit", None)) + _verify_time_limit(traj_gen_kwargs.get("duration"), kwargs.get("time_limit")) learn_sub_trajs = black_box_kwargs.get('learn_sub_trajectories') do_replanning = black_box_kwargs.get('replanning_schedule') diff --git a/test/test_black_box.py b/test/test_black_box.py new file mode 100644 index 0000000..974e3b3 --- /dev/null +++ b/test/test_black_box.py @@ -0,0 +1,104 @@ +from itertools import chain +from typing import Tuple, Type, Union, Optional + +import gym +import numpy as np +import pytest +from gym import register +from gym.wrappers import TimeLimit + +from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper +from gym.core import ActType, ObsType + +import fancy_gym + +SEED = 1 +ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2'] +WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper, + fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper] +ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values()) + + +class ToyEnv(gym.Env): + observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) + action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) + dt = 0.01 + + def reset(self, *, seed: Optional[int] = None, return_info: bool = False, + options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]: + return np.array([-1]) + + def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]: + return np.array([-1]), 1, False, {} + + def render(self, mode="human"): + pass + + +class ToyWrapper(RawInterfaceWrapper): + + @property + def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + return np.ones(self.action_space.shape) + + @property + def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + return np.ones(self.action_space.shape) + + +@pytest.fixture(scope="session", autouse=True) +def setup(): + register( + id=f'toy-v0', + entry_point='test.test_black_box:ToyEnv', + max_episode_steps=50, + ) + + +@pytest.mark.parametrize('env_id', ENV_IDS) +def test_missing_wrapper(env_id: str): + with pytest.raises(ValueError): + fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) + + +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): + env_id, wrapper_class = env_wrap + env = fancy_gym.make_bb(env_id, [wrapper_class], {}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}) + # check if observation space matches with the specified mask values which are true + env_step = fancy_gym.make(env_id, SEED) + wrapper = wrapper_class(env_step) + assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape + + +@pytest.mark.parametrize('env_id', ENV_IDS) +@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) +def test_aggregation(env_id: str, reward_aggregation: callable): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}) + env.reset() + + # ToyEnv only returns 1 as reward + assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) + + +@pytest.mark.parametrize('env_id', ENV_IDS) +@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) +def test_learn_sub_trajectories(env_id: str, add_time_aware_wrapper_before: bool): + env_step = fancy_gym.make(env_id, SEED) + env = fancy_gym.make_bb(env_id, [], {}, {}, {}, {'phase_generator_type': 'linear'}, {}) + + # has time aware wrapper + if add_time_aware_wrapper_before: + pass + + assert env.learn_sub_trajectories + assert env.learn_tau + assert env.observation_space == env_step.observation_space From 915ffbe92809ea5e96488c3472f4c3b264ca8b4b Mon Sep 17 00:00:00 2001 From: Fabian Date: Fri, 30 Sep 2022 15:07:48 +0200 Subject: [PATCH 2/4] Added more tests --- .../black_box/controller/base_controller.py | 3 + .../controller/meta_world_controller.py | 5 +- .../black_box/controller/pd_controller.py | 11 +- test/test_black_box.py | 157 +++++++++++++++--- test/test_controller.py | 73 ++++++++ 5 files changed, 219 insertions(+), 30 deletions(-) create mode 100644 test/test_controller.py diff --git a/fancy_gym/black_box/controller/base_controller.py b/fancy_gym/black_box/controller/base_controller.py index 1ac1522..e9045aa 100644 --- a/fancy_gym/black_box/controller/base_controller.py +++ b/fancy_gym/black_box/controller/base_controller.py @@ -2,3 +2,6 @@ class BaseController: def get_action(self, des_pos, des_vel, c_pos, c_vel): raise NotImplementedError + + def __call__(self, des_pos, des_vel, c_pos, c_vel): + return self.get_action(des_pos, des_vel, c_pos, c_vel) diff --git a/fancy_gym/black_box/controller/meta_world_controller.py b/fancy_gym/black_box/controller/meta_world_controller.py index efd8983..3e5bd37 100644 --- a/fancy_gym/black_box/controller/meta_world_controller.py +++ b/fancy_gym/black_box/controller/meta_world_controller.py @@ -18,7 +18,8 @@ class MetaWorldController(BaseController): cur_pos = c_pos[:-1] xyz_pos = des_pos[:-1] - assert xyz_pos.shape == cur_pos.shape, \ - f"Mismatch in dimension between desired position {xyz_pos.shape} and current position {cur_pos.shape}" + if xyz_pos.shape != cur_pos.shape: + raise ValueError(f"Mismatch in dimension between desired position" + f" {xyz_pos.shape} and current position {cur_pos.shape}") trq = np.hstack([(xyz_pos - cur_pos), gripper_pos]) return trq diff --git a/fancy_gym/black_box/controller/pd_controller.py b/fancy_gym/black_box/controller/pd_controller.py index 35203d8..78c2adc 100644 --- a/fancy_gym/black_box/controller/pd_controller.py +++ b/fancy_gym/black_box/controller/pd_controller.py @@ -8,7 +8,6 @@ class PDController(BaseController): A PD-Controller. Using position and velocity information from a provided environment, the tracking_controller calculates a response based on the desired position and velocity - :param env: A position environment :param p_gains: Factors for the proportional gains :param d_gains: Factors for the differential gains """ @@ -20,9 +19,11 @@ class PDController(BaseController): self.d_gains = d_gains def get_action(self, des_pos, des_vel, c_pos, c_vel): - assert des_pos.shape == c_pos.shape, \ - f"Mismatch in dimension between desired position {des_pos.shape} and current position {c_pos.shape}" - assert des_vel.shape == c_vel.shape, \ - f"Mismatch in dimension between desired velocity {des_vel.shape} and current velocity {c_vel.shape}" + if des_pos.shape != c_pos.shape: + raise ValueError(f"Mismatch in dimension between desired position " + f"{des_pos.shape} and current position {c_pos.shape}") + if des_vel.shape != c_vel.shape: + raise ValueError(f"Mismatch in dimension between desired velocity" + f" {des_vel.shape} and current velocity {c_vel.shape}") trq = self.p_gains * (des_pos - c_pos) + self.d_gains * (des_vel - c_vel) return trq diff --git a/test/test_black_box.py b/test/test_black_box.py index 974e3b3..fb7f78c 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -5,12 +5,11 @@ import gym import numpy as np import pytest from gym import register -from gym.wrappers import TimeLimit - -from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper from gym.core import ActType, ObsType import fancy_gym +from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper +from fancy_gym.utils.time_aware_observation import TimeAwareObservation SEED = 1 ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2'] @@ -19,11 +18,18 @@ WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values()) +class Object(object): + pass + + class ToyEnv(gym.Env): observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) dt = 0.01 + def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()): + self.a, self.b, self.c, self.d, self.e = a, b, c, d, e + def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]: return np.array([-1]) @@ -61,6 +67,67 @@ def test_missing_wrapper(env_id: str): fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) +def test_missing_local_state(): + env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}) + env.reset() + with pytest.raises(NotImplementedError): + env.step(env.action_space.sample()) + + +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +@pytest.mark.parametrize('verbose', [1, 2]) +def test_verbosity(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): + env_id, wrapper_class = env_wrap + env = fancy_gym.make_bb(env_id, [wrapper_class], {}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}) + env.reset() + info_keys = env.step(env.action_space.sample())[3].keys() + + env_step = fancy_gym.make(env_id, SEED) + env_step.reset() + info_keys_step = env_step.step(env_step.action_space.sample())[3].keys() + + assert info_keys_step in info_keys + assert 'trajectory_length' in info_keys + + if verbose >= 2: + mp_keys = ['position', 'velocities', 'step_actions', 'step_observations', 'step_rewards'] + assert mp_keys in info_keys + + +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +def test_length(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): + env_id, wrapper_class = env_wrap + env = fancy_gym.make_bb(env_id, [wrapper_class], {}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}) + env.reset() + length = env.step(env.action_space.sample())[3]['trajectory_length'] + + assert length == env.spec.max_episode_steps + + +@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) +def test_aggregation(reward_aggregation: callable): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}) + env.reset() + # ToyEnv only returns 1 as reward + assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) + + @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): env_id, wrapper_class = env_wrap @@ -75,30 +142,74 @@ def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape -@pytest.mark.parametrize('env_id', ENV_IDS) -@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) -def test_aggregation(env_id: str, reward_aggregation: callable): - env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, +@pytest.mark.parametrize('num_dof', [0, 1, 2, 5]) +@pytest.mark.parametrize('num_basis', [0, 1, 2, 5]) +@pytest.mark.parametrize('learn_tau', [True, False]) +@pytest.mark.parametrize('learn_delay', [True, False]) +def test_action_space(num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, + {'trajectory_generator_type': 'promp', + 'action_dim': num_dof + }, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear', + 'learn_tau': learn_tau, + 'learn_delay': learn_delay + }, + {'basis_generator_type': 'rbf', + 'num_basis': num_basis + }) + assert env.action_space.shape[0] == num_dof * num_basis + int(learn_tau) + int(learn_delay) + + +@pytest.mark.parametrize('a', [1]) +@pytest.mark.parametrize('b', [1.0]) +@pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]]) +@pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}]) +@pytest.mark.parametrize('e', [Object()]) +def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, + {'trajectory_generator_type': 'promp'}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear'}, + {'basis_generator_type': 'rbf'}, + a=a, b=b, c=c, d=d, e=e + ) + assert a is env.a + assert b is env.b + assert c is env.c + # Due to how gym works dict kwargs need to be copied and hence can only be checked to have the same content + assert d == env.d + assert e is env.e + + +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) +def test_learn_sub_trajectories(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], add_time_aware_wrapper_before: bool): + env_id, wrapper_class = env_wrap + env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) + wrappers = [wrapper_class] + + # has time aware wrapper + if add_time_aware_wrapper_before: + wrappers += [TimeAwareObservation] + + env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True}, {'trajectory_generator_type': 'promp'}, {'controller_type': 'motor'}, {'phase_generator_type': 'linear'}, {'basis_generator_type': 'rbf'}) - env.reset() - - # ToyEnv only returns 1 as reward - assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) - - -@pytest.mark.parametrize('env_id', ENV_IDS) -@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) -def test_learn_sub_trajectories(env_id: str, add_time_aware_wrapper_before: bool): - env_step = fancy_gym.make(env_id, SEED) - env = fancy_gym.make_bb(env_id, [], {}, {}, {}, {'phase_generator_type': 'linear'}, {}) - - # has time aware wrapper - if add_time_aware_wrapper_before: - pass assert env.learn_sub_trajectories - assert env.learn_tau + assert env.traj_gen.learn_tau assert env.observation_space == env_step.observation_space + + env.reset() + action = env.action_space.sample() + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + + factor = 1 / env.dt + assert np.allclose(length * env.dt, np.round(factor * action[0]) / factor) + assert np.allclose(length * env.dt, np.round(factor * env.traj_gen.tau.numpy()) / factor) diff --git a/test/test_controller.py b/test/test_controller.py new file mode 100644 index 0000000..c530c50 --- /dev/null +++ b/test/test_controller.py @@ -0,0 +1,73 @@ +from typing import Tuple, Union + +import numpy as np +import pytest + +from fancy_gym.black_box.factory import controller_factory + + +@pytest.mark.parametrize('ctrl_type', controller_factory.ALL_TYPES) +def test_initialization(ctrl_type: str): + controller_factory.get_controller(ctrl_type) + + +@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +def test_velocity(position: np.ndarray, velocity: np.ndarray): + ctrl = controller_factory.get_controller('velocity') + a = ctrl(position, velocity, None, None) + assert np.array_equal(a, velocity) + + +@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +def test_position(position: np.ndarray, velocity: np.ndarray): + ctrl = controller_factory.get_controller('position') + a = ctrl(position, velocity, None, None) + assert np.array_equal(a, position) + + +@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('current_position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('current_velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('p_gains', [0, 1, 0.5, np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('d_gains', [0, 1, 0.5, np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +def test_pd(position: np.ndarray, velocity: np.ndarray, current_position: np.ndarray, current_velocity: np.ndarray, + p_gains: Union[float, Tuple], d_gains: Union[float, Tuple]): + ctrl = controller_factory.get_controller('motor', p_gains=p_gains, d_gains=d_gains) + assert np.array_equal(ctrl.p_gains, p_gains) + assert np.array_equal(ctrl.d_gains, d_gains) + + a = ctrl(position, velocity, current_position, current_velocity) + pd = p_gains * (position - current_position) + d_gains * (velocity - current_velocity) + assert np.array_equal(a, pd) + + +@pytest.mark.parametrize('pos_vel', [(np.ones(3, ), np.ones(4, )), + (np.ones(4, ), np.ones(3, )), + (np.ones(4, ), np.ones(4, ))]) +def test_pd_invalid_shapes(pos_vel: Tuple[np.ndarray, np.ndarray]): + position, velocity = pos_vel + ctrl = controller_factory.get_controller('motor') + with pytest.raises(ValueError): + ctrl(position, velocity, np.ones(3, ), np.ones(3, )) + + +@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('current_position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)]) +@pytest.mark.parametrize('gripper_pos', [0, 1, 0.5]) +def test_metaworld(position: np.ndarray, current_position: np.ndarray, gripper_pos: float): + ctrl = controller_factory.get_controller('metaworld') + + position_grip = np.append(position, gripper_pos) + c_position_grip = np.append(current_position, -1) + a = ctrl(position_grip, None, c_position_grip, None) + assert a[-1] == gripper_pos + assert np.array_equal(a[:-1], position - current_position) + + +def test_metaworld_invalid_shapes(): + ctrl = controller_factory.get_controller('metaworld') + with pytest.raises(ValueError): + ctrl(np.ones(4, ), None, np.ones(3, ), None) From ed645c2fbed33759a094993a08cac44b7afa2c6e Mon Sep 17 00:00:00 2001 From: Fabian Date: Fri, 21 Oct 2022 16:16:49 +0200 Subject: [PATCH 3/4] added black box tests and bug fixes --- fancy_gym/black_box/black_box_wrapper.py | 29 ++- fancy_gym/meta/object_change_mp_wrapper.py | 2 +- fancy_gym/utils/make_env_helpers.py | 9 + test/test_black_box.py | 229 ++++++++++++++++----- test/test_replanning_sequencing.py | 143 +++++++++++++ 5 files changed, 343 insertions(+), 69 deletions(-) create mode 100644 test/test_replanning_sequencing.py diff --git a/fancy_gym/black_box/black_box_wrapper.py b/fancy_gym/black_box/black_box_wrapper.py index 68e2177..7652bd5 100644 --- a/fancy_gym/black_box/black_box_wrapper.py +++ b/fancy_gym/black_box/black_box_wrapper.py @@ -73,24 +73,30 @@ class BlackBoxWrapper(gym.ObservationWrapper): return observation.astype(self.observation_space.dtype) def get_trajectory(self, action: np.ndarray) -> Tuple: + duration = self.duration + if self.learn_sub_trajectories: + duration = None + # reset with every new call as we need to set all arguments, such as tau, delay, again. + # If we do not do this, the traj_gen assumes we are continuing the trajectory. + self.traj_gen.reset() + clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high) self.traj_gen.set_params(clipped_params) bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt) # TODO we could think about initializing with the previous desired value in order to have a smooth transition # at least from the planning point of view. self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel) - duration = None if self.learn_sub_trajectories else self.duration self.traj_gen.set_duration(duration, self.dt) # traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True) - trajectory = get_numpy(self.traj_gen.get_traj_pos()) + position = get_numpy(self.traj_gen.get_traj_pos()) velocity = get_numpy(self.traj_gen.get_traj_vel()) - if self.do_replanning: - # Remove first part of trajectory as this is already over - trajectory = trajectory[self.current_traj_steps:] - velocity = velocity[self.current_traj_steps:] + # if self.do_replanning: + # # Remove first part of trajectory as this is already over + # position = position[self.current_traj_steps:] + # velocity = velocity[self.current_traj_steps:] - return trajectory, velocity + return position, velocity def _get_traj_gen_action_space(self): """This function can be used to set up an individual space for the parameters of the traj_gen.""" @@ -125,9 +131,9 @@ class BlackBoxWrapper(gym.ObservationWrapper): # TODO remove this part, right now only needed for beer pong mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen) - trajectory, velocity = self.get_trajectory(mp_params) + position, velocity = self.get_trajectory(mp_params) - trajectory_length = len(trajectory) + trajectory_length = len(position) rewards = np.zeros(shape=(trajectory_length,)) if self.verbose >= 2: actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape) @@ -137,7 +143,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): infos = dict() done = False - for t, (pos, vel) in enumerate(zip(trajectory, velocity)): + for t, (pos, vel) in enumerate(zip(position, velocity)): step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel) c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high) obs, c_reward, done, info = self.env.step(c_action) @@ -163,7 +169,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): self.current_traj_steps += t + 1 if self.verbose >= 2: - infos['positions'] = trajectory + infos['positions'] = position infos['velocities'] = velocity infos['step_actions'] = actions[:t + 1] infos['step_observations'] = observations[:t + 1] @@ -180,4 +186,5 @@ class BlackBoxWrapper(gym.ObservationWrapper): def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): self.current_traj_steps = 0 + self.traj_gen.reset() return super(BlackBoxWrapper, self).reset() diff --git a/fancy_gym/meta/object_change_mp_wrapper.py b/fancy_gym/meta/object_change_mp_wrapper.py index 6faecc9..ca99c22 100644 --- a/fancy_gym/meta/object_change_mp_wrapper.py +++ b/fancy_gym/meta/object_change_mp_wrapper.py @@ -32,7 +32,7 @@ class MPWrapper(BaseMetaworldMPWrapper): # Current observation [False] * 3, # end-effector position [False] * 1, # normalized gripper open distance - [False] * 3, # main object position + [True] * 3, # main object position [False] * 4, # main object quaternion [False] * 3, # secondary object position [False] * 4, # secondary object quaternion diff --git a/fancy_gym/utils/make_env_helpers.py b/fancy_gym/utils/make_env_helpers.py index 18ab6ed..3c73ba9 100644 --- a/fancy_gym/utils/make_env_helpers.py +++ b/fancy_gym/utils/make_env_helpers.py @@ -166,6 +166,15 @@ def make_bb( # We have to learn the length when learning sub_trajectories trajectories phase_kwargs['learn_tau'] = True + # set tau bounds to minimum of two env steps otherwise computing the velocity is not possible. + # maximum is full duration of one episode. + if phase_kwargs.get('learn_tau'): + phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']] + + # Max delay is full duration minus two steps due to above reason + if phase_kwargs.get('learn_delay'): + phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2] + phase_gen = get_phase_generator(**phase_kwargs) basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs) controller = get_controller(**controller_kwargs) diff --git a/test/test_black_box.py b/test/test_black_box.py index fb7f78c..f1b360b 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -25,7 +25,7 @@ class Object(object): class ToyEnv(gym.Env): observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) - dt = 0.01 + dt = 0.02 def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()): self.a, self.b, self.c, self.d, self.e = a, b, c, d, e @@ -49,7 +49,7 @@ class ToyWrapper(RawInterfaceWrapper): @property def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: - return np.ones(self.action_space.shape) + return np.zeros(self.action_space.shape) @pytest.fixture(scope="session", autouse=True) @@ -67,74 +67,81 @@ def test_missing_wrapper(env_id: str): fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) -def test_missing_local_state(): +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +def test_missing_local_state(mp_type: str): env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) env.reset() with pytest.raises(NotImplementedError): env.step(env.action_space.sample()) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('verbose', [1, 2]) -def test_verbosity(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): +def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): env_id, wrapper_class = env_wrap - env = fancy_gym.make_bb(env_id, [wrapper_class], {}, - {'trajectory_generator_type': 'promp'}, + env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) env.reset() - info_keys = env.step(env.action_space.sample())[3].keys() + info_keys = list(env.step(env.action_space.sample())[3].keys()) env_step = fancy_gym.make(env_id, SEED) env_step.reset() info_keys_step = env_step.step(env_step.action_space.sample())[3].keys() - assert info_keys_step in info_keys + assert all(e in info_keys for e in info_keys_step) assert 'trajectory_length' in info_keys if verbose >= 2: - mp_keys = ['position', 'velocities', 'step_actions', 'step_observations', 'step_rewards'] - assert mp_keys in info_keys + mp_keys = ['positions', 'velocities', 'step_actions', 'step_observations', 'step_rewards'] + assert all(e in info_keys for e in mp_keys) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) -def test_length(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): +def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): env_id, wrapper_class = env_wrap env = fancy_gym.make_bb(env_id, [wrapper_class], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) - env.reset() - length = env.step(env.action_space.sample())[3]['trajectory_length'] - assert length == env.spec.max_episode_steps + for _ in range(5): + env.reset() + length = env.step(env.action_space.sample())[3]['trajectory_length'] + + assert length == env.spec.max_episode_steps +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) -def test_aggregation(reward_aggregation: callable): +def test_aggregation(mp_type: str, reward_aggregation: callable): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) env.reset() # ToyEnv only returns 1 as reward assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) -def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): +def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): env_id, wrapper_class = env_wrap env = fancy_gym.make_bb(env_id, [wrapper_class], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}) # check if observation space matches with the specified mask values which are true env_step = fancy_gym.make(env_id, SEED) @@ -142,36 +149,42 @@ def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('num_dof', [0, 1, 2, 5]) @pytest.mark.parametrize('num_basis', [0, 1, 2, 5]) @pytest.mark.parametrize('learn_tau', [True, False]) @pytest.mark.parametrize('learn_delay', [True, False]) -def test_action_space(num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): +def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, - {'trajectory_generator_type': 'promp', + {'trajectory_generator_type': mp_type, 'action_dim': num_dof }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear', + {'phase_generator_type': 'exp', 'learn_tau': learn_tau, 'learn_delay': learn_delay }, {'basis_generator_type': 'rbf', 'num_basis': num_basis }) - assert env.action_space.shape[0] == num_dof * num_basis + int(learn_tau) + int(learn_delay) + + base_dims = num_dof * num_basis + additional_dims = num_dof if mp_type == 'dmp' else 0 + traj_modification_dims = int(learn_tau) + int(learn_delay) + assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('a', [1]) @pytest.mark.parametrize('b', [1.0]) @pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]]) @pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}]) @pytest.mark.parametrize('e', [Object()]) -def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object): +def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, - {'trajectory_generator_type': 'promp'}, + {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, + {'phase_generator_type': 'exp'}, {'basis_generator_type': 'rbf'}, a=a, b=b, c=c, d=d, e=e ) @@ -183,33 +196,135 @@ def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object): assert e is env.e -@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) -@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) -def test_learn_sub_trajectories(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], add_time_aware_wrapper_before: bool): - env_id, wrapper_class = env_wrap - env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) - wrappers = [wrapper_class] - - # has time aware wrapper - if add_time_aware_wrapper_before: - wrappers += [TimeAwareObservation] - - env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True}, - {'trajectory_generator_type': 'promp'}, +@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) +def test_learn_tau(mp_type: str, tau: float): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear'}, - {'basis_generator_type': 'rbf'}) + {'phase_generator_type': 'linear', + 'learn_tau': True, + 'learn_delay': False + }, + {'basis_generator_type': 'rbf', + }, seed=SEED) - assert env.learn_sub_trajectories - assert env.traj_gen.learn_tau - assert env.observation_space == env_step.observation_space + d = True + for i in range(5): + if d: + env.reset() + action = env.action_space.sample() + action[0] = tau - env.reset() - action = env.action_space.sample() - obs, r, d, info = env.step(action) + obs, r, d, info = env.step(action) - length = info['trajectory_length'] + length = info['trajectory_length'] + assert length == env.spec.max_episode_steps + + tau_time_steps = int(np.round(tau / env.dt)) + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check end is all same (only true for linear basis) + assert np.all(pos[tau_time_steps:] == pos[-1]) + assert np.all(vel[tau_time_steps:] == vel[-1]) + + # Check active trajectory section is different to end values + assert np.all(pos[:tau_time_steps - 1] != pos[-1]) + assert np.all(vel[:tau_time_steps - 2] != vel[-1]) + + +@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) +def test_learn_delay(mp_type: str, delay: float): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear', + 'learn_tau': False, + 'learn_delay': True + }, + {'basis_generator_type': 'rbf', + }, seed=SEED) + + d = True + for i in range(5): + if d: + env.reset() + action = env.action_space.sample() + action[0] = delay + + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + assert length == env.spec.max_episode_steps + + delay_time_steps = int(np.round(delay / env.dt)) + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check beginning is all same (only true for linear basis) + assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0]) + assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0]) + + # Check active trajectory section is different to beginning values + assert np.all(pos[max(1, delay_time_steps):] != pos[0]) + assert np.all(vel[max(1, delay_time_steps)] != vel[0]) + + +@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) +@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) +def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': 'linear', + 'learn_tau': True, + 'learn_delay': True + }, + {'basis_generator_type': 'rbf', + }, seed=SEED) + + if env.spec.max_episode_steps * env.dt < delay + tau: + return + + d = True + for i in range(5): + if d: + env.reset() + action = env.action_space.sample() + action[0] = tau + action[1] = delay + + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + assert length == env.spec.max_episode_steps + + tau_time_steps = int(np.round(tau / env.dt)) + delay_time_steps = int(np.round(delay / env.dt)) + joint_time_steps = delay_time_steps + tau_time_steps + + pos = info['positions'].flatten() + vel = info['velocities'].flatten() + + # Check end is all same (only true for linear basis) + assert np.all(pos[joint_time_steps:] == pos[-1]) + assert np.all(vel[joint_time_steps:] == vel[-1]) + + # Check beginning is all same (only true for linear basis) + assert np.all(pos[:delay_time_steps - 1] == pos[0]) + assert np.all(vel[:delay_time_steps - 2] == vel[0]) + + # Check active trajectory section is different to beginning and end values + active_pos = pos[delay_time_steps: joint_time_steps - 1] + active_vel = vel[delay_time_steps: joint_time_steps - 2] + assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0]) + assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) - factor = 1 / env.dt - assert np.allclose(length * env.dt, np.round(factor * action[0]) / factor) - assert np.allclose(length * env.dt, np.round(factor * env.traj_gen.tau.numpy()) / factor) diff --git a/test/test_replanning_sequencing.py b/test/test_replanning_sequencing.py new file mode 100644 index 0000000..64045a5 --- /dev/null +++ b/test/test_replanning_sequencing.py @@ -0,0 +1,143 @@ +from itertools import chain +from typing import Tuple, Type, Union, Optional + +import gym +import numpy as np +import pytest +from gym import register +from gym.core import ActType, ObsType + +import fancy_gym +from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper +from fancy_gym.utils.time_aware_observation import TimeAwareObservation + +SEED = 1 +ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2'] +WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper, + fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper] +ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values()) + + +class ToyEnv(gym.Env): + observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) + action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) + dt = 0.02 + + def reset(self, *, seed: Optional[int] = None, return_info: bool = False, + options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]: + return np.array([-1]) + + def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]: + return np.array([-1]), 1, False, {} + + def render(self, mode="human"): + pass + + +class ToyWrapper(RawInterfaceWrapper): + + @property + def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: + return np.ones(self.action_space.shape) + + @property + def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: + return np.zeros(self.action_space.shape) + + +@pytest.fixture(scope="session", autouse=True) +def setup(): + register( + id=f'toy-v0', + entry_point='test.test_black_box:ToyEnv', + max_episode_steps=50, + ) + + +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) +def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], + add_time_aware_wrapper_before: bool): + env_id, wrapper_class = env_wrap + env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) + wrappers = [wrapper_class] + + # has time aware wrapper + if add_time_aware_wrapper_before: + wrappers += [TimeAwareObservation] + + env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True, 'verbose': 2}, + {'trajectory_generator_type': mp_type}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'exp'}, + {'basis_generator_type': 'rbf'}, seed=SEED) + + assert env.learn_sub_trajectories + assert env.traj_gen.learn_tau + # This also verifies we are not adding the TimeAwareObservationWrapper twice + assert env.observation_space == env_step.observation_space + + d = True + + for i in range(25): + if d: + env.reset() + action = env.action_space.sample() + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + + if not d: + assert length == np.round(action[0] / env.dt) + assert length == np.round(env.traj_gen.tau.numpy() / env.dt) + else: + # When done trajectory could be shorter due to termination. + assert length <= np.round(action[0] / env.dt) + assert length <= np.round(env.traj_gen.tau.numpy() / env.dt) + + +@pytest.mark.parametrize('mp_type', ['promp', 'dmp']) +@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) +@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) +@pytest.mark.parametrize('replanning_time', [10, 100, 1000]) +def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], + add_time_aware_wrapper_before: bool, replanning_time: int): + env_id, wrapper_class = env_wrap + env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) + wrappers = [wrapper_class] + + # has time aware wrapper + if add_time_aware_wrapper_before: + wrappers += [TimeAwareObservation] + + replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0 + + env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2}, + {'trajectory_generator_type': mp_type}, + {'controller_type': 'motor'}, + {'phase_generator_type': 'exp'}, + {'basis_generator_type': 'rbf'}, seed=SEED) + + assert env.do_replanning + assert env.replanning_schedule + # This also verifies we are not adding the TimeAwareObservationWrapper twice + assert env.observation_space == env_step.observation_space + + env.reset() + + episode_steps = env_step.spec.max_episode_steps // replanning_time + # Make 3 episodes, total steps depend on the replanning steps + for i in range(3 * episode_steps): + action = env.action_space.sample() + obs, r, d, info = env.step(action) + + length = info['trajectory_length'] + + if d: + # Check if number of steps until termination match the replanning interval + print(d, (i + 1), episode_steps) + assert (i + 1) % episode_steps == 0 + env.reset() + + assert replanning_schedule(None, None, None, None, length) From 5ebd4225ccac57598cde1dd20d71cb3ec858c576 Mon Sep 17 00:00:00 2001 From: Fabian Date: Mon, 24 Oct 2022 09:24:12 +0200 Subject: [PATCH 4/4] Adjusted Callable type hint --- fancy_gym/black_box/black_box_wrapper.py | 7 ++++--- test/test_black_box.py | 5 ++--- test/test_replanning_sequencing.py | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/fancy_gym/black_box/black_box_wrapper.py b/fancy_gym/black_box/black_box_wrapper.py index 7652bd5..16e4017 100644 --- a/fancy_gym/black_box/black_box_wrapper.py +++ b/fancy_gym/black_box/black_box_wrapper.py @@ -1,4 +1,4 @@ -from typing import Tuple, Optional +from typing import Tuple, Optional, Callable import gym import numpy as np @@ -19,8 +19,9 @@ class BlackBoxWrapper(gym.ObservationWrapper): duration: float, verbose: int = 1, learn_sub_trajectories: bool = False, - replanning_schedule: Optional[callable] = None, - reward_aggregation: callable = np.sum + replanning_schedule: Optional[ + Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None, + reward_aggregation: Callable[[np.ndarray], float] = np.sum ): """ gym.Wrapper for leveraging a black box approach with a trajectory generator. diff --git a/test/test_black_box.py b/test/test_black_box.py index f1b360b..d5e3a88 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -1,5 +1,5 @@ from itertools import chain -from typing import Tuple, Type, Union, Optional +from typing import Tuple, Type, Union, Optional, Callable import gym import numpy as np @@ -123,7 +123,7 @@ def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): @pytest.mark.parametrize('mp_type', ['promp', 'dmp']) @pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) -def test_aggregation(mp_type: str, reward_aggregation: callable): +def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]): env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, {'trajectory_generator_type': mp_type}, {'controller_type': 'motor'}, @@ -327,4 +327,3 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): active_vel = vel[delay_time_steps: joint_time_steps - 2] assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0]) assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) - diff --git a/test/test_replanning_sequencing.py b/test/test_replanning_sequencing.py index 64045a5..a42bb65 100644 --- a/test/test_replanning_sequencing.py +++ b/test/test_replanning_sequencing.py @@ -1,4 +1,5 @@ from itertools import chain +from types import FunctionType from typing import Tuple, Type, Union, Optional import gym @@ -120,7 +121,7 @@ def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWra {'basis_generator_type': 'rbf'}, seed=SEED) assert env.do_replanning - assert env.replanning_schedule + assert callable(env.replanning_schedule) # This also verifies we are not adding the TimeAwareObservationWrapper twice assert env.observation_space == env_step.observation_space