delete hacky experimental codes & add tests to test_black_box

This commit is contained in:
Hongyi Zhou 2022-11-13 16:59:13 +01:00
parent 60e18d2964
commit 104b909296
3 changed files with 86 additions and 33 deletions

View File

@ -24,7 +24,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None,
reward_aggregation: Callable[[np.ndarray], float] = np.sum,
max_planning_times: int = 1,
desired_conditioning: bool = False
desired_traj_bc: bool = False
):
"""
gym.Wrapper for leveraging a black box approach with a trajectory generator.
@ -59,18 +59,11 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# reward computation
self.reward_aggregation = reward_aggregation
# self.traj_gen.basis_gn.show_basis(plot=True)
# spaces
self.return_context_observation = not (learn_sub_trajectories or self.do_replanning)
# self.return_context_observation = True
self.traj_gen_action_space = self._get_traj_gen_action_space()
self.action_space = self._get_action_space()
# no goal learning
# tricky_action_upperbound = [np.inf] * (self.traj_gen_action_space.shape[0] - 7)
# tricky_action_lowerbound = [-np.inf] * (self.traj_gen_action_space.shape[0] - 7)
# self.action_space = spaces.Box(np.array(tricky_action_lowerbound), np.array(tricky_action_upperbound), dtype=np.float32)
self.observation_space = self._get_observation_space()
# rendering
@ -78,7 +71,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.verbose = verbose
# condition value
self.desired_conditioning = True
self.desired_traj_bc = desired_traj_bc
self.condition_pos = None
self.condition_vel = None
@ -157,11 +150,6 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def step(self, action: np.ndarray):
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
## tricky part, only use weights basis
# basis_weights = action.reshape(7, -1)
# goal_weights = np.zeros((7, 1))
# action = np.concatenate((basis_weights, goal_weights), axis=1).flatten()
# TODO remove this part, right now only needed for beer pong
mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen)
position, velocity = self.get_trajectory(mp_params)
@ -201,8 +189,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
if self.max_planning_times is not None and self.plan_counts >= self.max_planning_times:
continue
self.condition_pos = pos if self.desired_conditioning else self.current_pos
self.condition_vel = vel if self.desired_conditioning else self.current_vel
self.condition_pos = pos if self.desired_traj_bc else self.current_pos
self.condition_vel = vel if self.desired_traj_bc else self.current_vel
break

View File

@ -88,6 +88,7 @@ DEFAULT_BB_DICT_ProDMP = {
"black_box_kwargs": {
'replanning_schedule': None,
'max_planning_times': None,
'desired_traj_bc': False,
'verbose': 2
}
}
@ -509,6 +510,7 @@ for _v in _versions:
kwargs_dict_box_pushing_prodmp['phase_generator_kwargs']['alpha_phase'] = 3
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['max_planning_times'] = 2
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['replanning_schedule'] = lambda pos, vel, obs, action, t : t % 25 == 0
kwargs_dict_box_pushing_prodmp['black_box_kwargs']['desired_traj_bc'] = True
register(
id=_env_id,
entry_point='fancy_gym.utils.make_env_helpers:make_bb_env_helper',

View File

@ -205,18 +205,20 @@ def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e:
assert e is env.e
@pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
def test_learn_tau(mp_type: str, tau: float):
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
{'phase_generator_type': phase_generator_type,
'learn_tau': True,
'learn_delay': False
},
{'basis_generator_type': 'rbf',
{'basis_generator_type': basis_generator_type,
}, seed=SEED)
d = True
@ -237,26 +239,29 @@ def test_learn_tau(mp_type: str, tau: float):
vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis)
assert np.all(pos[tau_time_steps:] == pos[-1])
assert np.all(vel[tau_time_steps:] == vel[-1])
if phase_generator_type == "linear":
assert np.all(pos[tau_time_steps:] == pos[-1])
assert np.all(vel[tau_time_steps:] == vel[-1])
# Check active trajectory section is different to end values
assert np.all(pos[:tau_time_steps - 1] != pos[-1])
assert np.all(vel[:tau_time_steps - 2] != vel[-1])
@pytest.mark.parametrize('mp_type', ['promp'])
#
#
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
def test_learn_delay(mp_type: str, delay: float):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': True
},
{'basis_generator_type': 'rbf',
{'basis_generator_type': basis_generator_type,
}, seed=SEED)
d = True
@ -283,21 +288,23 @@ def test_learn_delay(mp_type: str, delay: float):
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
@pytest.mark.parametrize('mp_type', ['promp'])
#
#
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
{'phase_generator_type': phase_generator_type,
'learn_tau': True,
'learn_delay': True
},
{'basis_generator_type': 'rbf',
{'basis_generator_type': basis_generator_type,
}, seed=SEED)
if env.spec.max_episode_steps * env.dt < delay + tau:
@ -324,8 +331,9 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis)
assert np.all(pos[joint_time_steps:] == pos[-1])
assert np.all(vel[joint_time_steps:] == vel[-1])
if phase_generator_type == "linear":
assert np.all(pos[joint_time_steps:] == pos[-1])
assert np.all(vel[joint_time_steps:] == vel[-1])
# Check beginning is all same (only true for linear basis)
assert np.all(pos[:delay_time_steps - 1] == pos[0])
@ -336,3 +344,58 @@ def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
active_vel = vel[delay_time_steps: joint_time_steps - 2]
assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0])
assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0])
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_steps: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'max_planning_times': max_planning_times,
'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
for i in range(max_planning_times):
_, _, d, _ = env.step(env.action_space.sample())
assert d
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int):
basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
{'max_planning_times': max_planning_times,
'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': phase_generator_type,
'learn_tau': False,
'learn_delay': False
},
{'basis_generator_type': basis_generator_type,
},
seed=SEED)
_ = env.reset()
d = False
planning_times = 0
while not d:
_, _, d, _ = env.step(env.action_space.sample())
planning_times += 1
assert planning_times == max_planning_times