From 104b90929608f1299f8c559452c457a43fcb498d Mon Sep 17 00:00:00 2001 From: Hongyi Zhou Date: Sun, 13 Nov 2022 16:59:13 +0100 Subject: [PATCH] delete hacky experimental codes & add tests to test_black_box --- fancy_gym/black_box/black_box_wrapper.py | 20 +---- fancy_gym/envs/__init__.py | 2 + test/test_black_box.py | 97 +++++++++++++++++++----- 3 files changed, 86 insertions(+), 33 deletions(-) diff --git a/fancy_gym/black_box/black_box_wrapper.py b/fancy_gym/black_box/black_box_wrapper.py index ea28ef7..88f8a32 100644 --- a/fancy_gym/black_box/black_box_wrapper.py +++ b/fancy_gym/black_box/black_box_wrapper.py @@ -24,7 +24,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None, reward_aggregation: Callable[[np.ndarray], float] = np.sum, max_planning_times: int = 1, - desired_conditioning: bool = False + desired_traj_bc: bool = False ): """ gym.Wrapper for leveraging a black box approach with a trajectory generator. @@ -59,18 +59,11 @@ class BlackBoxWrapper(gym.ObservationWrapper): # reward computation self.reward_aggregation = reward_aggregation - # self.traj_gen.basis_gn.show_basis(plot=True) # spaces self.return_context_observation = not (learn_sub_trajectories or self.do_replanning) - # self.return_context_observation = True self.traj_gen_action_space = self._get_traj_gen_action_space() self.action_space = self._get_action_space() - # no goal learning - # tricky_action_upperbound = [np.inf] * (self.traj_gen_action_space.shape[0] - 7) - # tricky_action_lowerbound = [-np.inf] * (self.traj_gen_action_space.shape[0] - 7) - # self.action_space = spaces.Box(np.array(tricky_action_lowerbound), np.array(tricky_action_upperbound), dtype=np.float32) - self.observation_space = self._get_observation_space() # rendering @@ -78,7 +71,7 @@ class BlackBoxWrapper(gym.ObservationWrapper): self.verbose = verbose # condition value - self.desired_conditioning = True + self.desired_traj_bc = desired_traj_bc self.condition_pos = None self.condition_vel = None @@ -157,11 +150,6 @@ class BlackBoxWrapper(gym.ObservationWrapper): def step(self, action: np.ndarray): """ This function generates a trajectory based on a MP and then does the usual loop over reset and step""" - ## tricky part, only use weights basis - # basis_weights = action.reshape(7, -1) - # goal_weights = np.zeros((7, 1)) - # action = np.concatenate((basis_weights, goal_weights), axis=1).flatten() - # TODO remove this part, right now only needed for beer pong mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen) position, velocity = self.get_trajectory(mp_params) @@ -201,8 +189,8 @@ class BlackBoxWrapper(gym.ObservationWrapper): if self.max_planning_times is not None and self.plan_counts >= self.max_planning_times: continue - self.condition_pos = pos if self.desired_conditioning else self.current_pos - self.condition_vel = vel if self.desired_conditioning else self.current_vel + self.condition_pos = pos if self.desired_traj_bc else self.current_pos + self.condition_vel = vel if self.desired_traj_bc else self.current_vel break diff --git a/fancy_gym/envs/__init__.py b/fancy_gym/envs/__init__.py index 4483637..eb44d9f 100644 --- a/fancy_gym/envs/__init__.py +++ b/fancy_gym/envs/__init__.py @@ -88,6 +88,7 @@ DEFAULT_BB_DICT_ProDMP = { "black_box_kwargs": { 'replanning_schedule': None, 'max_planning_times': None, + 'desired_traj_bc': False, 'verbose': 2 } } @@ -509,6 +510,7 @@ for _v in _versions: kwargs_dict_box_pushing_prodmp['phase_generator_kwargs']['alpha_phase'] = 3 kwargs_dict_box_pushing_prodmp['black_box_kwargs']['max_planning_times'] = 2 kwargs_dict_box_pushing_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 25 == 0 + kwargs_dict_box_pushing_prodmp['black_box_kwargs']['desired_traj_bc'] = True register( id=_env_id, entry_point='fancy_gym.utils.make_env_helpers:make_bb_env_helper', diff --git a/test/test_black_box.py b/test/test_black_box.py index 69c0088..fa1cd01 100644 --- a/test/test_black_box.py +++ b/test/test_black_box.py @@ -205,18 +205,20 @@ def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: assert e is env.e -@pytest.mark.parametrize('mp_type', ['promp']) +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) def test_learn_tau(mp_type: str, tau: float): + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, {'trajectory_generator_type': mp_type, }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear', + {'phase_generator_type': phase_generator_type, 'learn_tau': True, 'learn_delay': False }, - {'basis_generator_type': 'rbf', + {'basis_generator_type': basis_generator_type, }, seed=SEED) d = True @@ -237,26 +239,29 @@ def test_learn_tau(mp_type: str, tau: float): vel = info['velocities'].flatten() # Check end is all same (only true for linear basis) - assert np.all(pos[tau_time_steps:] == pos[-1]) - assert np.all(vel[tau_time_steps:] == vel[-1]) + if phase_generator_type == "linear": + assert np.all(pos[tau_time_steps:] == pos[-1]) + assert np.all(vel[tau_time_steps:] == vel[-1]) # Check active trajectory section is different to end values assert np.all(pos[:tau_time_steps - 1] != pos[-1]) assert np.all(vel[:tau_time_steps - 2] != vel[-1]) - - -@pytest.mark.parametrize('mp_type', ['promp']) +# +# +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75]) def test_learn_delay(mp_type: str, delay: float): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, {'trajectory_generator_type': mp_type, }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear', + {'phase_generator_type': phase_generator_type, 'learn_tau': False, 'learn_delay': True }, - {'basis_generator_type': 'rbf', + {'basis_generator_type': basis_generator_type, }, seed=SEED) d = True @@ -283,21 +288,23 @@ def test_learn_delay(mp_type: str, delay: float): # Check active trajectory section is different to beginning values assert np.all(pos[max(1, delay_time_steps):] != pos[0]) assert np.all(vel[max(1, delay_time_steps)] != vel[0]) - - -@pytest.mark.parametrize('mp_type', ['promp']) +# +# +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1]) @pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1]) def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2}, {'trajectory_generator_type': mp_type, }, {'controller_type': 'motor'}, - {'phase_generator_type': 'linear', + {'phase_generator_type': phase_generator_type, 'learn_tau': True, 'learn_delay': True }, - {'basis_generator_type': 'rbf', + {'basis_generator_type': basis_generator_type, }, seed=SEED) if env.spec.max_episode_steps * env.dt < delay + tau: @@ -324,8 +331,9 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): vel = info['velocities'].flatten() # Check end is all same (only true for linear basis) - assert np.all(pos[joint_time_steps:] == pos[-1]) - assert np.all(vel[joint_time_steps:] == vel[-1]) + if phase_generator_type == "linear": + assert np.all(pos[joint_time_steps:] == pos[-1]) + assert np.all(vel[joint_time_steps:] == vel[-1]) # Check beginning is all same (only true for linear basis) assert np.all(pos[:delay_time_steps - 1] == pos[0]) @@ -336,3 +344,58 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float): active_vel = vel[delay_time_steps: joint_time_steps - 2] assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0]) assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0]) + + +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) +@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) +@pytest.mark.parametrize('sub_segment_steps', [5, 10]) +def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_steps: int): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], + {'max_planning_times': max_planning_times, + 'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': False, + 'learn_delay': False + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) + _ = env.reset() + d = False + for i in range(max_planning_times): + _, _, d, _ = env.step(env.action_space.sample()) + assert d + +@pytest.mark.parametrize('mp_type', ['promp', 'prodmp']) +@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4]) +@pytest.mark.parametrize('sub_segment_steps', [5, 10]) +def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int): + basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf' + phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear' + env = fancy_gym.make_bb('toy-v0', [ToyWrapper], + {'max_planning_times': max_planning_times, + 'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0, + 'verbose': 2}, + {'trajectory_generator_type': mp_type, + }, + {'controller_type': 'motor'}, + {'phase_generator_type': phase_generator_type, + 'learn_tau': False, + 'learn_delay': False + }, + {'basis_generator_type': basis_generator_type, + }, + seed=SEED) + _ = env.reset() + d = False + planning_times = 0 + while not d: + _, _, d, _ = env.step(env.action_space.sample()) + planning_times += 1 + assert planning_times == max_planning_times \ No newline at end of file