Merge pull request #48 from ALRhub/44-added-tests-for-the-black-box-wrapper
44 added tests for the black box wrapper
This commit is contained in:
commit
14615c10a6
@ -1,4 +1,4 @@
|
|||||||
from typing import Tuple, Optional
|
from typing import Tuple, Optional, Callable
|
||||||
|
|
||||||
import gym
|
import gym
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@ -19,8 +19,9 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
|||||||
duration: float,
|
duration: float,
|
||||||
verbose: int = 1,
|
verbose: int = 1,
|
||||||
learn_sub_trajectories: bool = False,
|
learn_sub_trajectories: bool = False,
|
||||||
replanning_schedule: Optional[callable] = None,
|
replanning_schedule: Optional[
|
||||||
reward_aggregation: callable = np.sum
|
Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None,
|
||||||
|
reward_aggregation: Callable[[np.ndarray], float] = np.sum
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
gym.Wrapper for leveraging a black box approach with a trajectory generator.
|
gym.Wrapper for leveraging a black box approach with a trajectory generator.
|
||||||
@ -73,24 +74,30 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
|||||||
return observation.astype(self.observation_space.dtype)
|
return observation.astype(self.observation_space.dtype)
|
||||||
|
|
||||||
def get_trajectory(self, action: np.ndarray) -> Tuple:
|
def get_trajectory(self, action: np.ndarray) -> Tuple:
|
||||||
|
duration = self.duration
|
||||||
|
if self.learn_sub_trajectories:
|
||||||
|
duration = None
|
||||||
|
# reset with every new call as we need to set all arguments, such as tau, delay, again.
|
||||||
|
# If we do not do this, the traj_gen assumes we are continuing the trajectory.
|
||||||
|
self.traj_gen.reset()
|
||||||
|
|
||||||
clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
|
clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
|
||||||
self.traj_gen.set_params(clipped_params)
|
self.traj_gen.set_params(clipped_params)
|
||||||
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
|
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
|
||||||
# TODO we could think about initializing with the previous desired value in order to have a smooth transition
|
# TODO we could think about initializing with the previous desired value in order to have a smooth transition
|
||||||
# at least from the planning point of view.
|
# at least from the planning point of view.
|
||||||
self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
|
self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
|
||||||
duration = None if self.learn_sub_trajectories else self.duration
|
|
||||||
self.traj_gen.set_duration(duration, self.dt)
|
self.traj_gen.set_duration(duration, self.dt)
|
||||||
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
|
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
|
||||||
trajectory = get_numpy(self.traj_gen.get_traj_pos())
|
position = get_numpy(self.traj_gen.get_traj_pos())
|
||||||
velocity = get_numpy(self.traj_gen.get_traj_vel())
|
velocity = get_numpy(self.traj_gen.get_traj_vel())
|
||||||
|
|
||||||
if self.do_replanning:
|
# if self.do_replanning:
|
||||||
# Remove first part of trajectory as this is already over
|
# # Remove first part of trajectory as this is already over
|
||||||
trajectory = trajectory[self.current_traj_steps:]
|
# position = position[self.current_traj_steps:]
|
||||||
velocity = velocity[self.current_traj_steps:]
|
# velocity = velocity[self.current_traj_steps:]
|
||||||
|
|
||||||
return trajectory, velocity
|
return position, velocity
|
||||||
|
|
||||||
def _get_traj_gen_action_space(self):
|
def _get_traj_gen_action_space(self):
|
||||||
"""This function can be used to set up an individual space for the parameters of the traj_gen."""
|
"""This function can be used to set up an individual space for the parameters of the traj_gen."""
|
||||||
@ -125,9 +132,9 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
|||||||
|
|
||||||
# TODO remove this part, right now only needed for beer pong
|
# TODO remove this part, right now only needed for beer pong
|
||||||
mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen)
|
mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen)
|
||||||
trajectory, velocity = self.get_trajectory(mp_params)
|
position, velocity = self.get_trajectory(mp_params)
|
||||||
|
|
||||||
trajectory_length = len(trajectory)
|
trajectory_length = len(position)
|
||||||
rewards = np.zeros(shape=(trajectory_length,))
|
rewards = np.zeros(shape=(trajectory_length,))
|
||||||
if self.verbose >= 2:
|
if self.verbose >= 2:
|
||||||
actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape)
|
actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape)
|
||||||
@ -137,7 +144,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
|||||||
infos = dict()
|
infos = dict()
|
||||||
done = False
|
done = False
|
||||||
|
|
||||||
for t, (pos, vel) in enumerate(zip(trajectory, velocity)):
|
for t, (pos, vel) in enumerate(zip(position, velocity)):
|
||||||
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
|
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
|
||||||
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
|
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
|
||||||
obs, c_reward, done, info = self.env.step(c_action)
|
obs, c_reward, done, info = self.env.step(c_action)
|
||||||
@ -163,7 +170,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
|||||||
self.current_traj_steps += t + 1
|
self.current_traj_steps += t + 1
|
||||||
|
|
||||||
if self.verbose >= 2:
|
if self.verbose >= 2:
|
||||||
infos['positions'] = trajectory
|
infos['positions'] = position
|
||||||
infos['velocities'] = velocity
|
infos['velocities'] = velocity
|
||||||
infos['step_actions'] = actions[:t + 1]
|
infos['step_actions'] = actions[:t + 1]
|
||||||
infos['step_observations'] = observations[:t + 1]
|
infos['step_observations'] = observations[:t + 1]
|
||||||
@ -180,4 +187,5 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
|||||||
|
|
||||||
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
|
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
|
||||||
self.current_traj_steps = 0
|
self.current_traj_steps = 0
|
||||||
|
self.traj_gen.reset()
|
||||||
return super(BlackBoxWrapper, self).reset()
|
return super(BlackBoxWrapper, self).reset()
|
||||||
|
@ -2,3 +2,6 @@ class BaseController:
|
|||||||
|
|
||||||
def get_action(self, des_pos, des_vel, c_pos, c_vel):
|
def get_action(self, des_pos, des_vel, c_pos, c_vel):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def __call__(self, des_pos, des_vel, c_pos, c_vel):
|
||||||
|
return self.get_action(des_pos, des_vel, c_pos, c_vel)
|
||||||
|
@ -18,7 +18,8 @@ class MetaWorldController(BaseController):
|
|||||||
cur_pos = c_pos[:-1]
|
cur_pos = c_pos[:-1]
|
||||||
xyz_pos = des_pos[:-1]
|
xyz_pos = des_pos[:-1]
|
||||||
|
|
||||||
assert xyz_pos.shape == cur_pos.shape, \
|
if xyz_pos.shape != cur_pos.shape:
|
||||||
f"Mismatch in dimension between desired position {xyz_pos.shape} and current position {cur_pos.shape}"
|
raise ValueError(f"Mismatch in dimension between desired position"
|
||||||
|
f" {xyz_pos.shape} and current position {cur_pos.shape}")
|
||||||
trq = np.hstack([(xyz_pos - cur_pos), gripper_pos])
|
trq = np.hstack([(xyz_pos - cur_pos), gripper_pos])
|
||||||
return trq
|
return trq
|
||||||
|
@ -8,7 +8,6 @@ class PDController(BaseController):
|
|||||||
A PD-Controller. Using position and velocity information from a provided environment,
|
A PD-Controller. Using position and velocity information from a provided environment,
|
||||||
the tracking_controller calculates a response based on the desired position and velocity
|
the tracking_controller calculates a response based on the desired position and velocity
|
||||||
|
|
||||||
:param env: A position environment
|
|
||||||
:param p_gains: Factors for the proportional gains
|
:param p_gains: Factors for the proportional gains
|
||||||
:param d_gains: Factors for the differential gains
|
:param d_gains: Factors for the differential gains
|
||||||
"""
|
"""
|
||||||
@ -20,9 +19,11 @@ class PDController(BaseController):
|
|||||||
self.d_gains = d_gains
|
self.d_gains = d_gains
|
||||||
|
|
||||||
def get_action(self, des_pos, des_vel, c_pos, c_vel):
|
def get_action(self, des_pos, des_vel, c_pos, c_vel):
|
||||||
assert des_pos.shape == c_pos.shape, \
|
if des_pos.shape != c_pos.shape:
|
||||||
f"Mismatch in dimension between desired position {des_pos.shape} and current position {c_pos.shape}"
|
raise ValueError(f"Mismatch in dimension between desired position "
|
||||||
assert des_vel.shape == c_vel.shape, \
|
f"{des_pos.shape} and current position {c_pos.shape}")
|
||||||
f"Mismatch in dimension between desired velocity {des_vel.shape} and current velocity {c_vel.shape}"
|
if des_vel.shape != c_vel.shape:
|
||||||
|
raise ValueError(f"Mismatch in dimension between desired velocity"
|
||||||
|
f" {des_vel.shape} and current velocity {c_vel.shape}")
|
||||||
trq = self.p_gains * (des_pos - c_pos) + self.d_gains * (des_vel - c_vel)
|
trq = self.p_gains * (des_pos - c_pos) + self.d_gains * (des_vel - c_vel)
|
||||||
return trq
|
return trq
|
||||||
|
@ -32,7 +32,7 @@ class MPWrapper(BaseMetaworldMPWrapper):
|
|||||||
# Current observation
|
# Current observation
|
||||||
[False] * 3, # end-effector position
|
[False] * 3, # end-effector position
|
||||||
[False] * 1, # normalized gripper open distance
|
[False] * 1, # normalized gripper open distance
|
||||||
[False] * 3, # main object position
|
[True] * 3, # main object position
|
||||||
[False] * 4, # main object quaternion
|
[False] * 4, # main object quaternion
|
||||||
[False] * 3, # secondary object position
|
[False] * 3, # secondary object position
|
||||||
[False] * 4, # secondary object quaternion
|
[False] * 4, # secondary object quaternion
|
||||||
|
@ -141,7 +141,7 @@ def make_bb(
|
|||||||
Returns: DMP wrapped gym env
|
Returns: DMP wrapped gym env
|
||||||
|
|
||||||
"""
|
"""
|
||||||
_verify_time_limit(traj_gen_kwargs.get("duration", None), kwargs.get("time_limit", None))
|
_verify_time_limit(traj_gen_kwargs.get("duration"), kwargs.get("time_limit"))
|
||||||
|
|
||||||
learn_sub_trajs = black_box_kwargs.get('learn_sub_trajectories')
|
learn_sub_trajs = black_box_kwargs.get('learn_sub_trajectories')
|
||||||
do_replanning = black_box_kwargs.get('replanning_schedule')
|
do_replanning = black_box_kwargs.get('replanning_schedule')
|
||||||
@ -166,6 +166,15 @@ def make_bb(
|
|||||||
# We have to learn the length when learning sub_trajectories trajectories
|
# We have to learn the length when learning sub_trajectories trajectories
|
||||||
phase_kwargs['learn_tau'] = True
|
phase_kwargs['learn_tau'] = True
|
||||||
|
|
||||||
|
# set tau bounds to minimum of two env steps otherwise computing the velocity is not possible.
|
||||||
|
# maximum is full duration of one episode.
|
||||||
|
if phase_kwargs.get('learn_tau'):
|
||||||
|
phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']]
|
||||||
|
|
||||||
|
# Max delay is full duration minus two steps due to above reason
|
||||||
|
if phase_kwargs.get('learn_delay'):
|
||||||
|
phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2]
|
||||||
|
|
||||||
phase_gen = get_phase_generator(**phase_kwargs)
|
phase_gen = get_phase_generator(**phase_kwargs)
|
||||||
basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs)
|
basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs)
|
||||||
controller = get_controller(**controller_kwargs)
|
controller = get_controller(**controller_kwargs)
|
||||||
|
329
test/test_black_box.py
Normal file
329
test/test_black_box.py
Normal file
@ -0,0 +1,329 @@
|
|||||||
|
from itertools import chain
|
||||||
|
from typing import Tuple, Type, Union, Optional, Callable
|
||||||
|
|
||||||
|
import gym
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
from gym import register
|
||||||
|
from gym.core import ActType, ObsType
|
||||||
|
|
||||||
|
import fancy_gym
|
||||||
|
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
|
||||||
|
from fancy_gym.utils.time_aware_observation import TimeAwareObservation
|
||||||
|
|
||||||
|
SEED = 1
|
||||||
|
ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2']
|
||||||
|
WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper,
|
||||||
|
fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper]
|
||||||
|
ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
|
||||||
|
|
||||||
|
|
||||||
|
class Object(object):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ToyEnv(gym.Env):
|
||||||
|
observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
|
||||||
|
action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
|
||||||
|
dt = 0.02
|
||||||
|
|
||||||
|
def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()):
|
||||||
|
self.a, self.b, self.c, self.d, self.e = a, b, c, d, e
|
||||||
|
|
||||||
|
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
|
||||||
|
options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]:
|
||||||
|
return np.array([-1])
|
||||||
|
|
||||||
|
def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
|
||||||
|
return np.array([-1]), 1, False, {}
|
||||||
|
|
||||||
|
def render(self, mode="human"):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ToyWrapper(RawInterfaceWrapper):
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||||
|
return np.ones(self.action_space.shape)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||||
|
return np.zeros(self.action_space.shape)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session", autouse=True)
|
||||||
|
def setup():
|
||||||
|
register(
|
||||||
|
id=f'toy-v0',
|
||||||
|
entry_point='test.test_black_box:ToyEnv',
|
||||||
|
max_episode_steps=50,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('env_id', ENV_IDS)
|
||||||
|
def test_missing_wrapper(env_id: str):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
def test_missing_local_state(mp_type: str):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'})
|
||||||
|
env.reset()
|
||||||
|
with pytest.raises(NotImplementedError):
|
||||||
|
env.step(env.action_space.sample())
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
|
||||||
|
@pytest.mark.parametrize('verbose', [1, 2])
|
||||||
|
def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int):
|
||||||
|
env_id, wrapper_class = env_wrap
|
||||||
|
env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'})
|
||||||
|
env.reset()
|
||||||
|
info_keys = list(env.step(env.action_space.sample())[3].keys())
|
||||||
|
|
||||||
|
env_step = fancy_gym.make(env_id, SEED)
|
||||||
|
env_step.reset()
|
||||||
|
info_keys_step = env_step.step(env_step.action_space.sample())[3].keys()
|
||||||
|
|
||||||
|
assert all(e in info_keys for e in info_keys_step)
|
||||||
|
assert 'trajectory_length' in info_keys
|
||||||
|
|
||||||
|
if verbose >= 2:
|
||||||
|
mp_keys = ['positions', 'velocities', 'step_actions', 'step_observations', 'step_rewards']
|
||||||
|
assert all(e in info_keys for e in mp_keys)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
|
||||||
|
def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
|
||||||
|
env_id, wrapper_class = env_wrap
|
||||||
|
env = fancy_gym.make_bb(env_id, [wrapper_class], {},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'})
|
||||||
|
|
||||||
|
for _ in range(5):
|
||||||
|
env.reset()
|
||||||
|
length = env.step(env.action_space.sample())[3]['trajectory_length']
|
||||||
|
|
||||||
|
assert length == env.spec.max_episode_steps
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])])
|
||||||
|
def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'})
|
||||||
|
env.reset()
|
||||||
|
# ToyEnv only returns 1 as reward
|
||||||
|
assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, ))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
|
||||||
|
def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
|
||||||
|
env_id, wrapper_class = env_wrap
|
||||||
|
env = fancy_gym.make_bb(env_id, [wrapper_class], {},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'})
|
||||||
|
# check if observation space matches with the specified mask values which are true
|
||||||
|
env_step = fancy_gym.make(env_id, SEED)
|
||||||
|
wrapper = wrapper_class(env_step)
|
||||||
|
assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('num_dof', [0, 1, 2, 5])
|
||||||
|
@pytest.mark.parametrize('num_basis', [0, 1, 2, 5])
|
||||||
|
@pytest.mark.parametrize('learn_tau', [True, False])
|
||||||
|
@pytest.mark.parametrize('learn_delay', [True, False])
|
||||||
|
def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
|
||||||
|
{'trajectory_generator_type': mp_type,
|
||||||
|
'action_dim': num_dof
|
||||||
|
},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp',
|
||||||
|
'learn_tau': learn_tau,
|
||||||
|
'learn_delay': learn_delay
|
||||||
|
},
|
||||||
|
{'basis_generator_type': 'rbf',
|
||||||
|
'num_basis': num_basis
|
||||||
|
})
|
||||||
|
|
||||||
|
base_dims = num_dof * num_basis
|
||||||
|
additional_dims = num_dof if mp_type == 'dmp' else 0
|
||||||
|
traj_modification_dims = int(learn_tau) + int(learn_delay)
|
||||||
|
assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('a', [1])
|
||||||
|
@pytest.mark.parametrize('b', [1.0])
|
||||||
|
@pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]])
|
||||||
|
@pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}])
|
||||||
|
@pytest.mark.parametrize('e', [Object()])
|
||||||
|
def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'},
|
||||||
|
a=a, b=b, c=c, d=d, e=e
|
||||||
|
)
|
||||||
|
assert a is env.a
|
||||||
|
assert b is env.b
|
||||||
|
assert c is env.c
|
||||||
|
# Due to how gym works dict kwargs need to be copied and hence can only be checked to have the same content
|
||||||
|
assert d == env.d
|
||||||
|
assert e is env.e
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp'])
|
||||||
|
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
|
||||||
|
def test_learn_tau(mp_type: str, tau: float):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
|
||||||
|
{'trajectory_generator_type': mp_type,
|
||||||
|
},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'linear',
|
||||||
|
'learn_tau': True,
|
||||||
|
'learn_delay': False
|
||||||
|
},
|
||||||
|
{'basis_generator_type': 'rbf',
|
||||||
|
}, seed=SEED)
|
||||||
|
|
||||||
|
d = True
|
||||||
|
for i in range(5):
|
||||||
|
if d:
|
||||||
|
env.reset()
|
||||||
|
action = env.action_space.sample()
|
||||||
|
action[0] = tau
|
||||||
|
|
||||||
|
obs, r, d, info = env.step(action)
|
||||||
|
|
||||||
|
length = info['trajectory_length']
|
||||||
|
assert length == env.spec.max_episode_steps
|
||||||
|
|
||||||
|
tau_time_steps = int(np.round(tau / env.dt))
|
||||||
|
|
||||||
|
pos = info['positions'].flatten()
|
||||||
|
vel = info['velocities'].flatten()
|
||||||
|
|
||||||
|
# Check end is all same (only true for linear basis)
|
||||||
|
assert np.all(pos[tau_time_steps:] == pos[-1])
|
||||||
|
assert np.all(vel[tau_time_steps:] == vel[-1])
|
||||||
|
|
||||||
|
# Check active trajectory section is different to end values
|
||||||
|
assert np.all(pos[:tau_time_steps - 1] != pos[-1])
|
||||||
|
assert np.all(vel[:tau_time_steps - 2] != vel[-1])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp'])
|
||||||
|
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
|
||||||
|
def test_learn_delay(mp_type: str, delay: float):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
|
||||||
|
{'trajectory_generator_type': mp_type,
|
||||||
|
},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'linear',
|
||||||
|
'learn_tau': False,
|
||||||
|
'learn_delay': True
|
||||||
|
},
|
||||||
|
{'basis_generator_type': 'rbf',
|
||||||
|
}, seed=SEED)
|
||||||
|
|
||||||
|
d = True
|
||||||
|
for i in range(5):
|
||||||
|
if d:
|
||||||
|
env.reset()
|
||||||
|
action = env.action_space.sample()
|
||||||
|
action[0] = delay
|
||||||
|
|
||||||
|
obs, r, d, info = env.step(action)
|
||||||
|
|
||||||
|
length = info['trajectory_length']
|
||||||
|
assert length == env.spec.max_episode_steps
|
||||||
|
|
||||||
|
delay_time_steps = int(np.round(delay / env.dt))
|
||||||
|
|
||||||
|
pos = info['positions'].flatten()
|
||||||
|
vel = info['velocities'].flatten()
|
||||||
|
|
||||||
|
# Check beginning is all same (only true for linear basis)
|
||||||
|
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
|
||||||
|
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
|
||||||
|
|
||||||
|
# Check active trajectory section is different to beginning values
|
||||||
|
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
|
||||||
|
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp'])
|
||||||
|
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
|
||||||
|
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
|
||||||
|
def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
|
||||||
|
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
|
||||||
|
{'trajectory_generator_type': mp_type,
|
||||||
|
},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'linear',
|
||||||
|
'learn_tau': True,
|
||||||
|
'learn_delay': True
|
||||||
|
},
|
||||||
|
{'basis_generator_type': 'rbf',
|
||||||
|
}, seed=SEED)
|
||||||
|
|
||||||
|
if env.spec.max_episode_steps * env.dt < delay + tau:
|
||||||
|
return
|
||||||
|
|
||||||
|
d = True
|
||||||
|
for i in range(5):
|
||||||
|
if d:
|
||||||
|
env.reset()
|
||||||
|
action = env.action_space.sample()
|
||||||
|
action[0] = tau
|
||||||
|
action[1] = delay
|
||||||
|
|
||||||
|
obs, r, d, info = env.step(action)
|
||||||
|
|
||||||
|
length = info['trajectory_length']
|
||||||
|
assert length == env.spec.max_episode_steps
|
||||||
|
|
||||||
|
tau_time_steps = int(np.round(tau / env.dt))
|
||||||
|
delay_time_steps = int(np.round(delay / env.dt))
|
||||||
|
joint_time_steps = delay_time_steps + tau_time_steps
|
||||||
|
|
||||||
|
pos = info['positions'].flatten()
|
||||||
|
vel = info['velocities'].flatten()
|
||||||
|
|
||||||
|
# Check end is all same (only true for linear basis)
|
||||||
|
assert np.all(pos[joint_time_steps:] == pos[-1])
|
||||||
|
assert np.all(vel[joint_time_steps:] == vel[-1])
|
||||||
|
|
||||||
|
# Check beginning is all same (only true for linear basis)
|
||||||
|
assert np.all(pos[:delay_time_steps - 1] == pos[0])
|
||||||
|
assert np.all(vel[:delay_time_steps - 2] == vel[0])
|
||||||
|
|
||||||
|
# Check active trajectory section is different to beginning and end values
|
||||||
|
active_pos = pos[delay_time_steps: joint_time_steps - 1]
|
||||||
|
active_vel = vel[delay_time_steps: joint_time_steps - 2]
|
||||||
|
assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0])
|
||||||
|
assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0])
|
73
test/test_controller.py
Normal file
73
test/test_controller.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
from typing import Tuple, Union
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from fancy_gym.black_box.factory import controller_factory
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('ctrl_type', controller_factory.ALL_TYPES)
|
||||||
|
def test_initialization(ctrl_type: str):
|
||||||
|
controller_factory.get_controller(ctrl_type)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
def test_velocity(position: np.ndarray, velocity: np.ndarray):
|
||||||
|
ctrl = controller_factory.get_controller('velocity')
|
||||||
|
a = ctrl(position, velocity, None, None)
|
||||||
|
assert np.array_equal(a, velocity)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
def test_position(position: np.ndarray, velocity: np.ndarray):
|
||||||
|
ctrl = controller_factory.get_controller('position')
|
||||||
|
a = ctrl(position, velocity, None, None)
|
||||||
|
assert np.array_equal(a, position)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('current_position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('current_velocity', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('p_gains', [0, 1, 0.5, np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('d_gains', [0, 1, 0.5, np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
def test_pd(position: np.ndarray, velocity: np.ndarray, current_position: np.ndarray, current_velocity: np.ndarray,
|
||||||
|
p_gains: Union[float, Tuple], d_gains: Union[float, Tuple]):
|
||||||
|
ctrl = controller_factory.get_controller('motor', p_gains=p_gains, d_gains=d_gains)
|
||||||
|
assert np.array_equal(ctrl.p_gains, p_gains)
|
||||||
|
assert np.array_equal(ctrl.d_gains, d_gains)
|
||||||
|
|
||||||
|
a = ctrl(position, velocity, current_position, current_velocity)
|
||||||
|
pd = p_gains * (position - current_position) + d_gains * (velocity - current_velocity)
|
||||||
|
assert np.array_equal(a, pd)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('pos_vel', [(np.ones(3, ), np.ones(4, )),
|
||||||
|
(np.ones(4, ), np.ones(3, )),
|
||||||
|
(np.ones(4, ), np.ones(4, ))])
|
||||||
|
def test_pd_invalid_shapes(pos_vel: Tuple[np.ndarray, np.ndarray]):
|
||||||
|
position, velocity = pos_vel
|
||||||
|
ctrl = controller_factory.get_controller('motor')
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
ctrl(position, velocity, np.ones(3, ), np.ones(3, ))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('current_position', [np.zeros(3, ), np.ones(3, ), np.arange(0, 3)])
|
||||||
|
@pytest.mark.parametrize('gripper_pos', [0, 1, 0.5])
|
||||||
|
def test_metaworld(position: np.ndarray, current_position: np.ndarray, gripper_pos: float):
|
||||||
|
ctrl = controller_factory.get_controller('metaworld')
|
||||||
|
|
||||||
|
position_grip = np.append(position, gripper_pos)
|
||||||
|
c_position_grip = np.append(current_position, -1)
|
||||||
|
a = ctrl(position_grip, None, c_position_grip, None)
|
||||||
|
assert a[-1] == gripper_pos
|
||||||
|
assert np.array_equal(a[:-1], position - current_position)
|
||||||
|
|
||||||
|
|
||||||
|
def test_metaworld_invalid_shapes():
|
||||||
|
ctrl = controller_factory.get_controller('metaworld')
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
ctrl(np.ones(4, ), None, np.ones(3, ), None)
|
144
test/test_replanning_sequencing.py
Normal file
144
test/test_replanning_sequencing.py
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
from itertools import chain
|
||||||
|
from types import FunctionType
|
||||||
|
from typing import Tuple, Type, Union, Optional
|
||||||
|
|
||||||
|
import gym
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
from gym import register
|
||||||
|
from gym.core import ActType, ObsType
|
||||||
|
|
||||||
|
import fancy_gym
|
||||||
|
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
|
||||||
|
from fancy_gym.utils.time_aware_observation import TimeAwareObservation
|
||||||
|
|
||||||
|
SEED = 1
|
||||||
|
ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2']
|
||||||
|
WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper,
|
||||||
|
fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper]
|
||||||
|
ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
|
||||||
|
|
||||||
|
|
||||||
|
class ToyEnv(gym.Env):
|
||||||
|
observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
|
||||||
|
action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
|
||||||
|
dt = 0.02
|
||||||
|
|
||||||
|
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
|
||||||
|
options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]:
|
||||||
|
return np.array([-1])
|
||||||
|
|
||||||
|
def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
|
||||||
|
return np.array([-1]), 1, False, {}
|
||||||
|
|
||||||
|
def render(self, mode="human"):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ToyWrapper(RawInterfaceWrapper):
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||||
|
return np.ones(self.action_space.shape)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||||
|
return np.zeros(self.action_space.shape)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session", autouse=True)
|
||||||
|
def setup():
|
||||||
|
register(
|
||||||
|
id=f'toy-v0',
|
||||||
|
entry_point='test.test_black_box:ToyEnv',
|
||||||
|
max_episode_steps=50,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
|
||||||
|
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
|
||||||
|
def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
|
||||||
|
add_time_aware_wrapper_before: bool):
|
||||||
|
env_id, wrapper_class = env_wrap
|
||||||
|
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
|
||||||
|
wrappers = [wrapper_class]
|
||||||
|
|
||||||
|
# has time aware wrapper
|
||||||
|
if add_time_aware_wrapper_before:
|
||||||
|
wrappers += [TimeAwareObservation]
|
||||||
|
|
||||||
|
env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True, 'verbose': 2},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'}, seed=SEED)
|
||||||
|
|
||||||
|
assert env.learn_sub_trajectories
|
||||||
|
assert env.traj_gen.learn_tau
|
||||||
|
# This also verifies we are not adding the TimeAwareObservationWrapper twice
|
||||||
|
assert env.observation_space == env_step.observation_space
|
||||||
|
|
||||||
|
d = True
|
||||||
|
|
||||||
|
for i in range(25):
|
||||||
|
if d:
|
||||||
|
env.reset()
|
||||||
|
action = env.action_space.sample()
|
||||||
|
obs, r, d, info = env.step(action)
|
||||||
|
|
||||||
|
length = info['trajectory_length']
|
||||||
|
|
||||||
|
if not d:
|
||||||
|
assert length == np.round(action[0] / env.dt)
|
||||||
|
assert length == np.round(env.traj_gen.tau.numpy() / env.dt)
|
||||||
|
else:
|
||||||
|
# When done trajectory could be shorter due to termination.
|
||||||
|
assert length <= np.round(action[0] / env.dt)
|
||||||
|
assert length <= np.round(env.traj_gen.tau.numpy() / env.dt)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
|
||||||
|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
|
||||||
|
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
|
||||||
|
@pytest.mark.parametrize('replanning_time', [10, 100, 1000])
|
||||||
|
def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
|
||||||
|
add_time_aware_wrapper_before: bool, replanning_time: int):
|
||||||
|
env_id, wrapper_class = env_wrap
|
||||||
|
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
|
||||||
|
wrappers = [wrapper_class]
|
||||||
|
|
||||||
|
# has time aware wrapper
|
||||||
|
if add_time_aware_wrapper_before:
|
||||||
|
wrappers += [TimeAwareObservation]
|
||||||
|
|
||||||
|
replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0
|
||||||
|
|
||||||
|
env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2},
|
||||||
|
{'trajectory_generator_type': mp_type},
|
||||||
|
{'controller_type': 'motor'},
|
||||||
|
{'phase_generator_type': 'exp'},
|
||||||
|
{'basis_generator_type': 'rbf'}, seed=SEED)
|
||||||
|
|
||||||
|
assert env.do_replanning
|
||||||
|
assert callable(env.replanning_schedule)
|
||||||
|
# This also verifies we are not adding the TimeAwareObservationWrapper twice
|
||||||
|
assert env.observation_space == env_step.observation_space
|
||||||
|
|
||||||
|
env.reset()
|
||||||
|
|
||||||
|
episode_steps = env_step.spec.max_episode_steps // replanning_time
|
||||||
|
# Make 3 episodes, total steps depend on the replanning steps
|
||||||
|
for i in range(3 * episode_steps):
|
||||||
|
action = env.action_space.sample()
|
||||||
|
obs, r, d, info = env.step(action)
|
||||||
|
|
||||||
|
length = info['trajectory_length']
|
||||||
|
|
||||||
|
if d:
|
||||||
|
# Check if number of steps until termination match the replanning interval
|
||||||
|
print(d, (i + 1), episode_steps)
|
||||||
|
assert (i + 1) % episode_steps == 0
|
||||||
|
env.reset()
|
||||||
|
|
||||||
|
assert replanning_schedule(None, None, None, None, length)
|
Loading…
Reference in New Issue
Block a user