365 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			365 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from itertools import chain
 | 
						|
from types import FunctionType
 | 
						|
from typing import Tuple, Type, Union, Optional
 | 
						|
 | 
						|
import gymnasium as gym
 | 
						|
import numpy as np
 | 
						|
import pytest
 | 
						|
from gymnasium import register, make
 | 
						|
from gymnasium.core import ActType, ObsType
 | 
						|
from gymnasium import spaces
 | 
						|
 | 
						|
import fancy_gym
 | 
						|
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
 | 
						|
from fancy_gym.utils.wrappers import TimeAwareObservation
 | 
						|
from fancy_gym.utils.make_env_helpers import ensure_finite_time
 | 
						|
 | 
						|
SEED = 1
 | 
						|
ENV_IDS = ['fancy/Reacher5d-v0', 'dm_control/ball_in_cup-catch-v0', 'metaworld/reach-v2', 'Reacher-v2']
 | 
						|
WRAPPERS = [fancy_gym.envs.mujoco.reacher.MPWrapper, fancy_gym.dmc.suite.ball_in_cup.MPWrapper,
 | 
						|
            fancy_gym.meta.goal_object_change_mp_wrapper.MPWrapper, fancy_gym.open_ai.mujoco.reacher_v2.MPWrapper]
 | 
						|
ALL_MP_ENVS = fancy_gym.ALL_MOVEMENT_PRIMITIVE_ENVIRONMENTS['all']
 | 
						|
 | 
						|
MAX_STEPS_FALLBACK = 50
 | 
						|
 | 
						|
 | 
						|
class ToyEnv(gym.Env):
 | 
						|
    observation_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
 | 
						|
    action_space = gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float64)
 | 
						|
    dt = 0.02
 | 
						|
 | 
						|
    def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
 | 
						|
              options: Optional[dict] = None) -> Union[ObsType, Tuple[ObsType, dict]]:
 | 
						|
        obs, options = np.array([-1]), {}
 | 
						|
        return obs, options
 | 
						|
 | 
						|
    def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
 | 
						|
        obs, reward, terminated, truncated, info = np.array([-1]), 1, False, False, {}
 | 
						|
        return obs, reward, terminated, truncated, info
 | 
						|
 | 
						|
    def render(self):
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
class ToyWrapper(RawInterfaceWrapper):
 | 
						|
 | 
						|
    @property
 | 
						|
    def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
 | 
						|
        return np.ones(self.action_space.shape)
 | 
						|
 | 
						|
    @property
 | 
						|
    def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
 | 
						|
        return np.zeros(self.action_space.shape)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="session", autouse=True)
 | 
						|
def setup():
 | 
						|
    register(
 | 
						|
        id=f'toy-v0',
 | 
						|
        entry_point='test.test_black_box:ToyEnv',
 | 
						|
        max_episode_steps=50,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp'])
 | 
						|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
 | 
						|
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
 | 
						|
def test_learn_sub_trajectories(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
 | 
						|
                                add_time_aware_wrapper_before: bool):
 | 
						|
    env_id, wrapper_class = env_wrap
 | 
						|
    env_step = TimeAwareObservation(ensure_finite_time(make(env_id, SEED), MAX_STEPS_FALLBACK))
 | 
						|
    wrappers = [wrapper_class]
 | 
						|
 | 
						|
    # has time aware wrapper
 | 
						|
    if add_time_aware_wrapper_before:
 | 
						|
        wrappers += [TimeAwareObservation]
 | 
						|
 | 
						|
    env = fancy_gym.make_bb(env_id, [wrapper_class], {'learn_sub_trajectories': True, 'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type},
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': 'exp'},
 | 
						|
                            {'basis_generator_type': 'rbf'}, fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
    env.reset(seed=SEED)
 | 
						|
 | 
						|
    assert env.learn_sub_trajectories
 | 
						|
    assert env.spec.max_episode_steps
 | 
						|
    assert env_step.spec.max_episode_steps
 | 
						|
    assert env.traj_gen.learn_tau
 | 
						|
    # This also verifies we are not adding the TimeAwareObservationWrapper twice
 | 
						|
    assert spaces.flatten_space(env_step.observation_space) == spaces.flatten_space(env.observation_space)
 | 
						|
 | 
						|
    done = True
 | 
						|
 | 
						|
    for i in range(25):
 | 
						|
        if done:
 | 
						|
            env.reset(seed=SEED)
 | 
						|
 | 
						|
        action = env.action_space.sample()
 | 
						|
        _obs, _reward, terminated, truncated, info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
 | 
						|
        length = info['trajectory_length']
 | 
						|
 | 
						|
        if not done:
 | 
						|
            assert length == np.round(action[0] / env.dt)
 | 
						|
            assert length == np.round(env.traj_gen.tau.numpy() / env.dt)
 | 
						|
        else:
 | 
						|
            # When done trajectory could be shorter due to termination.
 | 
						|
            assert length <= np.round(action[0] / env.dt)
 | 
						|
            assert length <= np.round(env.traj_gen.tau.numpy() / env.dt)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'dmp', 'prodmp'])
 | 
						|
@pytest.mark.parametrize('env_wrap', zip(ENV_IDS, WRAPPERS))
 | 
						|
@pytest.mark.parametrize('add_time_aware_wrapper_before', [True, False])
 | 
						|
@pytest.mark.parametrize('replanning_time', [10, 100, 1000])
 | 
						|
def test_replanning_time(mp_type: str, env_wrap: Tuple[str, Type[RawInterfaceWrapper]],
 | 
						|
                         add_time_aware_wrapper_before: bool, replanning_time: int):
 | 
						|
    env_id, wrapper_class = env_wrap
 | 
						|
    env_step = TimeAwareObservation(ensure_finite_time(make(env_id, SEED), MAX_STEPS_FALLBACK))
 | 
						|
    wrappers = [wrapper_class]
 | 
						|
 | 
						|
    # has time aware wrapper
 | 
						|
    if add_time_aware_wrapper_before:
 | 
						|
        wrappers += [TimeAwareObservation]
 | 
						|
 | 
						|
    def replanning_schedule(c_pos, c_vel, obs, c_action, t): return t % replanning_time == 0
 | 
						|
 | 
						|
    basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
 | 
						|
    phase_generator_type = 'exp' if 'dmp' in mp_type else 'linear'
 | 
						|
 | 
						|
    env = fancy_gym.make_bb(env_id, [wrapper_class], {'replanning_schedule': replanning_schedule, 'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type},
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': phase_generator_type},
 | 
						|
                            {'basis_generator_type': basis_generator_type}, fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
    env.reset(seed=SEED)
 | 
						|
 | 
						|
    assert env.do_replanning
 | 
						|
    assert env.spec.max_episode_steps
 | 
						|
    assert env_step.spec.max_episode_steps
 | 
						|
    assert callable(env.replanning_schedule)
 | 
						|
    # This also verifies we are not adding the TimeAwareObservationWrapper twice
 | 
						|
    assert spaces.flatten_space(env_step.observation_space) == spaces.flatten_space(env.observation_space)
 | 
						|
 | 
						|
    env.reset(seed=SEED)
 | 
						|
 | 
						|
    episode_steps = env_step.spec.max_episode_steps // replanning_time
 | 
						|
    # Make 3 episodes, total steps depend on the replanning steps
 | 
						|
    for i in range(3 * episode_steps):
 | 
						|
        action = env.action_space.sample()
 | 
						|
        _obs, _reward, terminated, truncated, info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
 | 
						|
        length = info['trajectory_length']
 | 
						|
 | 
						|
        if done:
 | 
						|
            # Check if number of steps until termination match the replanning interval
 | 
						|
            print(done, (i + 1), episode_steps)
 | 
						|
            assert (i + 1) % episode_steps == 0
 | 
						|
            env.reset(seed=SEED)
 | 
						|
 | 
						|
        assert replanning_schedule(None, None, None, None, length)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
 | 
						|
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
 | 
						|
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
 | 
						|
def test_max_planning_times(mp_type: str, max_planning_times: int, sub_segment_steps: int):
 | 
						|
    basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
 | 
						|
    phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
 | 
						|
    env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
 | 
						|
                            {'max_planning_times': max_planning_times,
 | 
						|
                             'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
 | 
						|
                             'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type,
 | 
						|
                             },
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': phase_generator_type,
 | 
						|
                             'learn_tau': False,
 | 
						|
                             'learn_delay': False
 | 
						|
                             },
 | 
						|
                            {'basis_generator_type': basis_generator_type,
 | 
						|
                             },
 | 
						|
                            fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
 | 
						|
    _ = env.reset(seed=SEED)
 | 
						|
    done = False
 | 
						|
    planning_times = 0
 | 
						|
    while not done:
 | 
						|
        action = env.action_space.sample()
 | 
						|
        _obs, _reward, terminated, truncated, _info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
        planning_times += 1
 | 
						|
    assert planning_times == max_planning_times
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
 | 
						|
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
 | 
						|
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
 | 
						|
@pytest.mark.parametrize('tau', [0.5, 1.0, 1.5, 2.0])
 | 
						|
def test_replanning_with_learn_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int, tau: float):
 | 
						|
    basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
 | 
						|
    phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
 | 
						|
    env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
 | 
						|
                            {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
 | 
						|
                             'max_planning_times': max_planning_times,
 | 
						|
                             'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type,
 | 
						|
                             },
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': phase_generator_type,
 | 
						|
                             'learn_tau': True,
 | 
						|
                             'learn_delay': False
 | 
						|
                             },
 | 
						|
                            {'basis_generator_type': basis_generator_type,
 | 
						|
                             },
 | 
						|
                            fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
 | 
						|
    _ = env.reset(seed=SEED)
 | 
						|
    done = False
 | 
						|
    planning_times = 0
 | 
						|
    while not done:
 | 
						|
        action = env.action_space.sample()
 | 
						|
        action[0] = tau
 | 
						|
        _obs, _reward, terminated, truncated, _info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
        planning_times += 1
 | 
						|
    assert planning_times == max_planning_times
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
 | 
						|
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
 | 
						|
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
 | 
						|
@pytest.mark.parametrize('delay', [0.1, 0.25, 0.5, 0.75])
 | 
						|
def test_replanning_with_learn_delay(mp_type: str, max_planning_times: int, sub_segment_steps: int, delay: float):
 | 
						|
    basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
 | 
						|
    phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
 | 
						|
    env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
 | 
						|
                            {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
 | 
						|
                             'max_planning_times': max_planning_times,
 | 
						|
                             'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type,
 | 
						|
                             },
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': phase_generator_type,
 | 
						|
                             'learn_tau': False,
 | 
						|
                             'learn_delay': True
 | 
						|
                             },
 | 
						|
                            {'basis_generator_type': basis_generator_type,
 | 
						|
                             },
 | 
						|
                            fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
 | 
						|
    _ = env.reset(seed=SEED)
 | 
						|
    done = False
 | 
						|
    planning_times = 0
 | 
						|
    while not done:
 | 
						|
        action = env.action_space.sample()
 | 
						|
        action[0] = delay
 | 
						|
        _obs, _reward, terminated, truncated, info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
 | 
						|
        delay_time_steps = int(np.round(delay / env.dt))
 | 
						|
        pos = info['positions'].flatten()
 | 
						|
        vel = info['velocities'].flatten()
 | 
						|
 | 
						|
        # Check beginning is all same (only true for linear basis)
 | 
						|
        if planning_times == 0:
 | 
						|
            assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
 | 
						|
            assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
 | 
						|
 | 
						|
        # only valid when delay < sub_segment_steps
 | 
						|
        elif planning_times > 0 and delay_time_steps < sub_segment_steps:
 | 
						|
            assert np.all(pos[1:max(1, delay_time_steps - 1)] != pos[0])
 | 
						|
            assert np.all(vel[1:max(1, delay_time_steps - 2)] != vel[0])
 | 
						|
 | 
						|
        # Check active trajectory section is different to beginning values
 | 
						|
        assert np.all(pos[max(1, delay_time_steps):] != pos[0])
 | 
						|
        assert np.all(vel[max(1, delay_time_steps)] != vel[0])
 | 
						|
 | 
						|
        planning_times += 1
 | 
						|
 | 
						|
    assert planning_times == max_planning_times
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
 | 
						|
@pytest.mark.parametrize('max_planning_times', [1, 2, 3])
 | 
						|
@pytest.mark.parametrize('sub_segment_steps', [5, 10, 15])
 | 
						|
@pytest.mark.parametrize('delay', [0, 0.25, 0.5, 0.75])
 | 
						|
@pytest.mark.parametrize('tau', [0.5, 0.75, 1.0])
 | 
						|
def test_replanning_with_learn_delay_and_tau(mp_type: str, max_planning_times: int, sub_segment_steps: int,
 | 
						|
                                             delay: float, tau: float):
 | 
						|
    basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
 | 
						|
    phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
 | 
						|
    env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
 | 
						|
                            {'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
 | 
						|
                             'max_planning_times': max_planning_times,
 | 
						|
                             'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type,
 | 
						|
                             },
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': phase_generator_type,
 | 
						|
                             'learn_tau': True,
 | 
						|
                             'learn_delay': True
 | 
						|
                             },
 | 
						|
                            {'basis_generator_type': basis_generator_type,
 | 
						|
                             },
 | 
						|
                            fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
 | 
						|
    _ = env.reset(seed=SEED)
 | 
						|
    done = False
 | 
						|
    planning_times = 0
 | 
						|
    while not done:
 | 
						|
        action = env.action_space.sample()
 | 
						|
        action[0] = tau
 | 
						|
        action[1] = delay
 | 
						|
        _obs, _reward, terminated, truncated, info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
 | 
						|
        delay_time_steps = int(np.round(delay / env.dt))
 | 
						|
 | 
						|
        pos = info['positions'].flatten()
 | 
						|
        vel = info['velocities'].flatten()
 | 
						|
 | 
						|
        # Delay only applies to first planning time
 | 
						|
        if planning_times == 0:
 | 
						|
            # Check delay is applied
 | 
						|
            assert np.all(pos[:max(1, delay_time_steps - 1)] == pos[0])
 | 
						|
            assert np.all(vel[:max(1, delay_time_steps - 2)] == vel[0])
 | 
						|
            # Check active trajectory section is different to beginning values
 | 
						|
            assert np.all(pos[max(1, delay_time_steps):] != pos[0])
 | 
						|
            assert np.all(vel[max(1, delay_time_steps)] != vel[0])
 | 
						|
 | 
						|
        planning_times += 1
 | 
						|
 | 
						|
    assert planning_times == max_planning_times
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize('mp_type', ['promp', 'prodmp'])
 | 
						|
@pytest.mark.parametrize('max_planning_times', [1, 2, 3, 4])
 | 
						|
@pytest.mark.parametrize('sub_segment_steps', [5, 10])
 | 
						|
def test_replanning_schedule(mp_type: str, max_planning_times: int, sub_segment_steps: int):
 | 
						|
    basis_generator_type = 'prodmp' if mp_type == 'prodmp' else 'rbf'
 | 
						|
    phase_generator_type = 'exp' if mp_type == 'prodmp' else 'linear'
 | 
						|
    env = fancy_gym.make_bb('toy-v0', [ToyWrapper],
 | 
						|
                            {'max_planning_times': max_planning_times,
 | 
						|
                             'replanning_schedule': lambda pos, vel, obs, action, t: t % sub_segment_steps == 0,
 | 
						|
                             'verbose': 2},
 | 
						|
                            {'trajectory_generator_type': mp_type,
 | 
						|
                             },
 | 
						|
                            {'controller_type': 'motor'},
 | 
						|
                            {'phase_generator_type': phase_generator_type,
 | 
						|
                             'learn_tau': False,
 | 
						|
                             'learn_delay': False
 | 
						|
                             },
 | 
						|
                            {'basis_generator_type': basis_generator_type,
 | 
						|
                             },
 | 
						|
                            fallback_max_steps=MAX_STEPS_FALLBACK)
 | 
						|
 | 
						|
    _ = env.reset(seed=SEED)
 | 
						|
    for i in range(max_planning_times):
 | 
						|
        action = env.action_space.sample()
 | 
						|
        _obs, _reward, terminated, truncated, _info = env.step(action)
 | 
						|
        done = terminated or truncated
 | 
						|
    assert done
 |