Merge remote-tracking branch 'alr/master' into mujoco_binding

# Conflicts:
#	fancy_gym/black_box/black_box_wrapper.py
This commit is contained in:
Hongyi Zhou 2022-10-24 09:53:41 +02:00
commit e3509f8be3
5 changed files with 349 additions and 80 deletions

View File

@ -1,4 +1,4 @@
from typing import Tuple, Optional from typing import Tuple, Optional, Callable
import gym import gym
import numpy as np import numpy as np
@ -9,7 +9,6 @@ from fancy_gym.black_box.controller.base_controller import BaseController
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
from fancy_gym.utils.utils import get_numpy from fancy_gym.utils.utils import get_numpy
import torch
class BlackBoxWrapper(gym.ObservationWrapper): class BlackBoxWrapper(gym.ObservationWrapper):
@ -20,8 +19,9 @@ class BlackBoxWrapper(gym.ObservationWrapper):
duration: float, duration: float,
verbose: int = 1, verbose: int = 1,
learn_sub_trajectories: bool = False, learn_sub_trajectories: bool = False,
replanning_schedule: Optional[callable] = None, replanning_schedule: Optional[
reward_aggregation: callable = np.sum Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int], bool]] = None,
reward_aggregation: Callable[[np.ndarray], float] = np.sum
): ):
""" """
gym.Wrapper for leveraging a black box approach with a trajectory generator. gym.Wrapper for leveraging a black box approach with a trajectory generator.
@ -48,12 +48,10 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# trajectory generation # trajectory generation
self.traj_gen = trajectory_generator self.traj_gen = trajectory_generator
self.tracking_controller = tracking_controller self.tracking_controller = tracking_controller
# self.time_steps = np.linspace(0, self.duration, self.traj_steps) # self.time_steps = np.linspace(0, self.duration, self.traj_steps)
# self.traj_gen.set_mp_times(self.time_steps) # self.traj_gen.set_mp_times(self.time_steps)
self.traj_gen.set_duration(self.duration, self.dt) self.traj_gen.set_duration(self.duration, self.dt)
# self.traj_gen.basis_gn.show_basis(plot=True)
# reward computation # reward computation
self.reward_aggregation = reward_aggregation self.reward_aggregation = reward_aggregation
@ -76,27 +74,30 @@ class BlackBoxWrapper(gym.ObservationWrapper):
return observation.astype(self.observation_space.dtype) return observation.astype(self.observation_space.dtype)
def get_trajectory(self, action: np.ndarray) -> Tuple: def get_trajectory(self, action: np.ndarray) -> Tuple:
duration = self.duration
if self.learn_sub_trajectories:
duration = None
# reset with every new call as we need to set all arguments, such as tau, delay, again.
# If we do not do this, the traj_gen assumes we are continuing the trajectory.
self.traj_gen.reset()
clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high) clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
self.traj_gen.set_params(clipped_params) self.traj_gen.set_params(clipped_params)
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt) bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
# TODO we could think about initializing with the previous desired value in order to have a smooth transition # TODO we could think about initializing with the previous desired value in order to have a smooth transition
# at least from the planning point of view. # at least from the planning point of view.
self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
self.traj_gen.set_boundary_conditions(bc_time,
self.current_pos,
self.current_vel)
duration = None if self.learn_sub_trajectories else self.duration
self.traj_gen.set_duration(duration, self.dt) self.traj_gen.set_duration(duration, self.dt)
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True) # traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
trajectory = get_numpy(self.traj_gen.get_traj_pos()) position = get_numpy(self.traj_gen.get_traj_pos())
velocity = get_numpy(self.traj_gen.get_traj_vel()) velocity = get_numpy(self.traj_gen.get_traj_vel())
if self.do_replanning: # if self.do_replanning:
# Remove first part of trajectory as this is already over # # Remove first part of trajectory as this is already over
trajectory = trajectory[self.current_traj_steps:] # position = position[self.current_traj_steps:]
velocity = velocity[self.current_traj_steps:] # velocity = velocity[self.current_traj_steps:]
return trajectory, velocity return position, velocity
def _get_traj_gen_action_space(self): def _get_traj_gen_action_space(self):
"""This function can be used to set up an individual space for the parameters of the traj_gen.""" """This function can be used to set up an individual space for the parameters of the traj_gen."""
@ -131,9 +132,9 @@ class BlackBoxWrapper(gym.ObservationWrapper):
# TODO remove this part, right now only needed for beer pong # TODO remove this part, right now only needed for beer pong
mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen) mp_params, env_spec_params = self.env.episode_callback(action, self.traj_gen)
trajectory, velocity = self.get_trajectory(mp_params) position, velocity = self.get_trajectory(mp_params)
trajectory_length = len(trajectory) trajectory_length = len(position)
rewards = np.zeros(shape=(trajectory_length,)) rewards = np.zeros(shape=(trajectory_length,))
if self.verbose >= 2: if self.verbose >= 2:
actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape) actions = np.zeros(shape=(trajectory_length,) + self.env.action_space.shape)
@ -143,7 +144,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
infos = dict() infos = dict()
done = False done = False
for t, (pos, vel) in enumerate(zip(trajectory, velocity)): for t, (pos, vel) in enumerate(zip(position, velocity)):
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel) step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high) c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
obs, c_reward, done, info = self.env.step(c_action) obs, c_reward, done, info = self.env.step(c_action)
@ -169,7 +170,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
self.current_traj_steps += t + 1 self.current_traj_steps += t + 1
if self.verbose >= 2: if self.verbose >= 2:
infos['positions'] = trajectory infos['positions'] = position
infos['velocities'] = velocity infos['velocities'] = velocity
infos['step_actions'] = actions[:t + 1] infos['step_actions'] = actions[:t + 1]
infos['step_observations'] = observations[:t + 1] infos['step_observations'] = observations[:t + 1]
@ -186,4 +187,5 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None):
self.current_traj_steps = 0 self.current_traj_steps = 0
self.traj_gen.reset()
return super(BlackBoxWrapper, self).reset() return super(BlackBoxWrapper, self).reset()

View File

@ -32,7 +32,7 @@ class MPWrapper(BaseMetaworldMPWrapper):
# Current observation # Current observation
[False] * 3, # end-effector position [False] * 3, # end-effector position
[False] * 1, # normalized gripper open distance [False] * 1, # normalized gripper open distance
[False] * 3, # main object position [True] * 3, # main object position
[False] * 4, # main object quaternion [False] * 4, # main object quaternion
[False] * 3, # secondary object position [False] * 3, # secondary object position
[False] * 4, # secondary object quaternion [False] * 4, # secondary object quaternion

View File

@ -166,6 +166,15 @@ def make_bb(
# We have to learn the length when learning sub_trajectories trajectories # We have to learn the length when learning sub_trajectories trajectories
phase_kwargs['learn_tau'] = True phase_kwargs['learn_tau'] = True
# set tau bounds to minimum of two env steps otherwise computing the velocity is not possible.
# maximum is full duration of one episode.
if phase_kwargs.get('learn_tau'):
phase_kwargs["tau_bound"] = [env.dt * 2, black_box_kwargs['duration']]
# Max delay is full duration minus two steps due to above reason
if phase_kwargs.get('learn_delay'):
phase_kwargs["delay_bound"] = [0, black_box_kwargs['duration'] - env.dt * 2]
phase_gen = get_phase_generator(**phase_kwargs) phase_gen = get_phase_generator(**phase_kwargs)
basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs) basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs)
controller = get_controller(**controller_kwargs) controller = get_controller(**controller_kwargs)

View File

@ -1,5 +1,5 @@
from itertools import chain from itertools import chain
from typing import Tuple, Type, Union, Optional from typing import Tuple, Type, Union, Optional, Callable
import gym import gym
import numpy as np import numpy as np
@ -25,7 +25,7 @@ class Object(object):
class ToyEnv(gym.Env): class ToyEnv(gym.Env):
observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64) action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
dt = 0.01 dt = 0.02
def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()): def __init__(self, a: int = 0, b: float = 0.0, c: list = [], d: dict = {}, e: Object = Object()):
self.a, self.b, self.c, self.d, self.e = a, b, c, d, e self.a, self.b, self.c, self.d, self.e = a, b, c, d, e
@ -49,7 +49,7 @@ class ToyWrapper(RawInterfaceWrapper):
@property @property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return np.ones(self.action_space.shape) return np.zeros(self.action_space.shape)
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@ -67,74 +67,81 @@ def test_missing_wrapper(env_id: str):
fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {}) fancy_gym.make_bb(env_id, [], {}, {}, {}, {}, {})
def test_missing_local_state(): @pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
def test_missing_local_state(mp_type: str):
env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {}, env = fancy_gym.make_bb('toy-v0', [RawInterfaceWrapper], {},
{'trajectory_generator_type': 'promp'}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': 'rbf'})
env.reset() env.reset()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
env.step(env.action_space.sample()) env.step(env.action_space.sample())
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('verbose', [1, 2]) @pytest.mark.parametrize('verbose', [1, 2])
def test_verbosity(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int): def test_verbosity(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]], verbose: int):
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {}, env = fancy_gym.make_bb(env_id, [wrapper_class], {'verbose': verbose},
{'trajectory_generator_type': 'promp'}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': 'rbf'})
env.reset() env.reset()
info_keys = env.step(env.action_space.sample())[3].keys() info_keys = list(env.step(env.action_space.sample())[3].keys())
env_step = fancy_gym.make(env_id, SEED) env_step = fancy_gym.make(env_id, SEED)
env_step.reset() env_step.reset()
info_keys_step = env_step.step(env_step.action_space.sample())[3].keys() info_keys_step = env_step.step(env_step.action_space.sample())[3].keys()
assert info_keys_step in info_keys assert all(e in info_keys for e in info_keys_step)
assert 'trajectory_length' in info_keys assert 'trajectory_length' in info_keys
if verbose >= 2: if verbose >= 2:
mp_keys = ['position', 'velocities', 'step_actions', 'step_observations', 'step_rewards'] mp_keys = ['positions', 'velocities', 'step_actions', 'step_observations', 'step_rewards']
assert mp_keys in info_keys assert all(e in info_keys for e in mp_keys)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
def test_length(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): def test_length(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {}, env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': 'promp'}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': 'rbf'})
env.reset()
length = env.step(env.action_space.sample())[3]['trajectory_length']
assert length == env.spec.max_episode_steps for _ in range(5):
env.reset()
length = env.step(env.action_space.sample())[3]['trajectory_length']
assert length == env.spec.max_episode_steps
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])]) @pytest.mark.parametrize('reward_aggregation', [np.sum, np.mean, np.median, lambda x: np.mean(x[::2])])
def test_aggregation(reward_aggregation: callable): def test_aggregation(mp_type: str, reward_aggregation: Callable[[np.ndarray], float]):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'reward_aggregation': reward_aggregation},
{'trajectory_generator_type': 'promp'}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': 'rbf'})
env.reset() env.reset()
# ToyEnv only returns 1 as reward # ToyEnv only returns 1 as reward
assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, )) assert env.step(env.action_space.sample())[1] == reward_aggregation(np.ones(50, ))
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]): def test_context_space(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
env_id, wrapper_class = env_wrap env_id, wrapper_class = env_wrap
env = fancy_gym.make_bb(env_id, [wrapper_class], {}, env = fancy_gym.make_bb(env_id, [wrapper_class], {},
{'trajectory_generator_type': 'promp'}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}) {'basis_generator_type': 'rbf'})
# check if observation space matches with the specified mask values which are true # check if observation space matches with the specified mask values which are true
env_step = fancy_gym.make(env_id, SEED) env_step = fancy_gym.make(env_id, SEED)
@ -142,36 +149,42 @@ def test_context_space(env_wrap: Tuple[str, Type[RawInterfaceWrapper]]):
assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape assert env.observation_space.shape == wrapper.context_mask[wrapper.context_mask].shape
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('num_dof', [0, 1, 2, 5]) @pytest.mark.parametrize('num_dof', [0, 1, 2, 5])
@pytest.mark.parametrize('num_basis', [0, 1, 2, 5]) @pytest.mark.parametrize('num_basis', [0, 1, 2, 5])
@pytest.mark.parametrize('learn_tau', [True, False]) @pytest.mark.parametrize('learn_tau', [True, False])
@pytest.mark.parametrize('learn_delay', [True, False]) @pytest.mark.parametrize('learn_delay', [True, False])
def test_action_space(num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool): def test_action_space(mp_type: str, num_dof: int, num_basis: int, learn_tau: bool, learn_delay: bool):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': 'promp', {'trajectory_generator_type': mp_type,
'action_dim': num_dof 'action_dim': num_dof
}, },
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear', {'phase_generator_type': 'exp',
'learn_tau': learn_tau, 'learn_tau': learn_tau,
'learn_delay': learn_delay 'learn_delay': learn_delay
}, },
{'basis_generator_type': 'rbf', {'basis_generator_type': 'rbf',
'num_basis': num_basis 'num_basis': num_basis
}) })
assert env.action_space.shape[0] == num_dof * num_basis + int(learn_tau) + int(learn_delay)
base_dims = num_dof * num_basis
additional_dims = num_dof if mp_type == 'dmp' else 0
traj_modification_dims = int(learn_tau) + int(learn_delay)
assert env.action_space.shape[0] == base_dims + traj_modification_dims + additional_dims
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('a', [1]) @pytest.mark.parametrize('a', [1])
@pytest.mark.parametrize('b', [1.0]) @pytest.mark.parametrize('b', [1.0])
@pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]]) @pytest.mark.parametrize('c', [[1], [1.0], ['str'], [{'a': 'b'}], [np.ones(3, )]])
@pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}]) @pytest.mark.parametrize('d', [{'a': 1}, {1: 2.0}, {'a': [1.0]}, {'a': np.ones(3, )}, {'a': {'a': 'b'}}])
@pytest.mark.parametrize('e', [Object()]) @pytest.mark.parametrize('e', [Object()])
def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object): def test_change_env_kwargs(mp_type: str, a: int, b: float, c: list, d: dict, e: Object):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {}, env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {},
{'trajectory_generator_type': 'promp'}, {'trajectory_generator_type': mp_type},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, {'basis_generator_type': 'rbf'},
a=a, b=b, c=c, d=d, e=e a=a, b=b, c=c, d=d, e=e
) )
@ -183,33 +196,134 @@ def test_change_env_kwargs(a: int, b: float, c: list, d: dict, e: Object):
assert e is env.e assert e is env.e
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS)) @pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False]) @pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
def test_learn_sub_trajectories(env_wrap: Tuple[str, Type[RawInterfaceWrapper]], add_time_aware_wrapper_before: bool): def test_learn_tau(mp_type: str, tau: float):
env_id, wrapper_class = env_wrap env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED)) {'trajectory_generator_type': mp_type,
wrappers = [wrapper_class] },
# has time aware wrapper
if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation]
env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True},
{'trajectory_generator_type': 'promp'},
{'controller_type': 'motor'}, {'controller_type': 'motor'},
{'phase_generator_type': 'linear'}, {'phase_generator_type': 'linear',
{'basis_generator_type': 'rbf'}) 'learn_tau': True,
'learn_delay': False
},
{'basis_generator_type': 'rbf',
}, seed=SEED)
assert env.learn_sub_trajectories d = True
assert env.traj_gen.learn_tau for i in range(5):
assert env.observation_space == env_step.observation_space if d:
env.reset()
action = env.action_space.sample()
action[0] = tau
env.reset() obs, r, d, info = env.step(action)
action = env.action_space.sample()
obs, r, d, info = env.step(action)
length = info['trajectory_length'] length = info['trajectory_length']
assert length == env.spec.max_episode_steps
factor = 1 / env.dt tau_time_steps = int(np.round(tau / env.dt))
assert np.allclose(length * env.dt, np.round(factor * action[0]) / factor)
assert np.allclose(length * env.dt, np.round(factor * env.traj_gen.tau.numpy()) / factor) pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis)
assert np.all(pos[tau_time_steps:] == pos[-1])
assert np.all(vel[tau_time_steps:] == vel[-1])
# Check active trajectory section is different to end values
assert np.all(pos[:tau_time_steps - 1] != pos[-1])
assert np.all(vel[:tau_time_steps - 2] != vel[-1])
@pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
def test_learn_delay(mp_type: str, delay: float):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
'learn_tau': False,
'learn_delay': True
},
{'basis_generator_type': 'rbf',
}, seed=SEED)
d = True
for i in range(5):
if d:
env.reset()
action = env.action_space.sample()
action[0] = delay
obs, r, d, info = env.step(action)
length = info['trajectory_length']
assert length == env.spec.max_episode_steps
delay_time_steps = int(np.round(delay / env.dt))
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check beginning is all same (only true for linear basis)
assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
# Check active trajectory section is different to beginning values
assert np.all(pos[max(1, delay_time_steps):] != pos[0])
assert np.all(vel[max(1, delay_time_steps)] != vel[0])
@pytest.mark.parametrize('mp_type', ['promp'])
@pytest.mark.parametrize('tau', [0.25, 0.5, 0.75, 1])
@pytest.mark.parametrize('delay', [0.25, 0.5, 0.75, 1])
def test_learn_tau_and_delay(mp_type: str, tau: float, delay: float):
env = fancy_gym.make_bb('toy-v0', [ToyWrapper], {'verbose': 2},
{'trajectory_generator_type': mp_type,
},
{'controller_type': 'motor'},
{'phase_generator_type': 'linear',
'learn_tau': True,
'learn_delay': True
},
{'basis_generator_type': 'rbf',
}, seed=SEED)
if env.spec.max_episode_steps * env.dt < delay + tau:
return
d = True
for i in range(5):
if d:
env.reset()
action = env.action_space.sample()
action[0] = tau
action[1] = delay
obs, r, d, info = env.step(action)
length = info['trajectory_length']
assert length == env.spec.max_episode_steps
tau_time_steps = int(np.round(tau / env.dt))
delay_time_steps = int(np.round(delay / env.dt))
joint_time_steps = delay_time_steps + tau_time_steps
pos = info['positions'].flatten()
vel = info['velocities'].flatten()
# Check end is all same (only true for linear basis)
assert np.all(pos[joint_time_steps:] == pos[-1])
assert np.all(vel[joint_time_steps:] == vel[-1])
# Check beginning is all same (only true for linear basis)
assert np.all(pos[:delay_time_steps - 1] == pos[0])
assert np.all(vel[:delay_time_steps - 2] == vel[0])
# Check active trajectory section is different to beginning and end values
active_pos = pos[delay_time_steps: joint_time_steps - 1]
active_vel = vel[delay_time_steps: joint_time_steps - 2]
assert np.all(active_pos != pos[-1]) and np.all(active_pos != pos[0])
assert np.all(active_vel != vel[-1]) and np.all(active_vel != vel[0])

View File

@ -0,0 +1,144 @@
from itertools import chain
from types import FunctionType
from typing import Tuple, Type, Union, Optional
import gym
import numpy as np
import pytest
from gym import register
from gym.core import ActType, ObsType
import fancy_gym
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
from fancy_gym.utils.time_aware_observation import TimeAwareObservation
SEED = 1
ENV_IDS = ['Reacher5d-v0', 'dmc:ball_in_cup-catch', 'metaworld:reach-v2', 'Reacher-v2']
WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper,
fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper]
ALL_MP_ENVS = chain(*fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS.values())
class ToyEnv(gym.Env):
observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
dt = 0.02
def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]:
return np.array([-1])
def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
return np.array([-1]), 1, False, {}
def render(self, mode="human"):
pass
class ToyWrapper(RawInterfaceWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
return np.ones(self.action_space.shape)
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return np.zeros(self.action_space.shape)
@pytest.fixture(scope="session", autouse=True)
def setup():
register(
id=f'toy-v0',
entry_point='test.test_black_box:ToyEnv',
max_episode_steps=50,
)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
add_time_aware_wrapper_before: bool):
env_id, wrapper_class = env_wrap
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
wrappers = [wrapper_class]
# has time aware wrapper
if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation]
env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True, 'verbose': 2},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, seed=SEED)
assert env.learn_sub_trajectories
assert env.traj_gen.learn_tau
# This also verifies we are not adding the TimeAwareObservationWrapper twice
assert env.observation_space == env_step.observation_space
d = True
for i in range(25):
if d:
env.reset()
action = env.action_space.sample()
obs, r, d, info = env.step(action)
length = info['trajectory_length']
if not d:
assert length == np.round(action[0] / env.dt)
assert length == np.round(env.traj_gen.tau.numpy() / env.dt)
else:
# When done trajectory could be shorter due to termination.
assert length <= np.round(action[0] / env.dt)
assert length <= np.round(env.traj_gen.tau.numpy() / env.dt)
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
@pytest.mark.parametrize('replanning_time', [10, 100, 1000])
def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
add_time_aware_wrapper_before: bool, replanning_time: int):
env_id, wrapper_class = env_wrap
env_step = TimeAwareObservation(fancy_gym.make(env_id, SEED))
wrappers = [wrapper_class]
# has time aware wrapper
if add_time_aware_wrapper_before:
wrappers += [TimeAwareObservation]
replanning_schedule = lambda c_pos, c_vel, obs, c_action, t: t % replanning_time == 0
env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2},
{'trajectory_generator_type': mp_type},
{'controller_type': 'motor'},
{'phase_generator_type': 'exp'},
{'basis_generator_type': 'rbf'}, seed=SEED)
assert env.do_replanning
assert callable(env.replanning_schedule)
# This also verifies we are not adding the TimeAwareObservationWrapper twice
assert env.observation_space == env_step.observation_space
env.reset()
episode_steps = env_step.spec.max_episode_steps // replanning_time
# Make 3 episodes, total steps depend on the replanning steps
for i in range(3 * episode_steps):
action = env.action_space.sample()
obs, r, d, info = env.step(action)
length = info['trajectory_length']
if d:
# Check if number of steps until termination match the replanning interval
print(d, (i + 1), episode_steps)
assert (i + 1) % episode_steps == 0
env.reset()
assert replanning_schedule(None, None, None, None, length)