fancy_gym/alr_envs/utils/make_env_helpers.py

277 lines
11 KiB
Python
Raw Normal View History

2021-11-30 16:11:32 +01:00
import warnings
2022-06-30 14:08:54 +02:00
from copy import deepcopy
from typing import Iterable, Type, Union, MutableMapping
2021-04-21 10:45:34 +02:00
import gym
import numpy as np
2022-06-30 14:08:54 +02:00
from gym.envs.registration import EnvSpec, registry
from gym.wrappers import TimeAwareObservation
2022-05-03 19:51:54 +02:00
from alr_envs.black_box.black_box_wrapper import BlackBoxWrapper
2022-07-07 10:47:04 +02:00
from alr_envs.black_box.factory.controller_factory import get_controller
from alr_envs.black_box.factory.basis_generator_factory import get_basis_generator
from alr_envs.black_box.factory.phase_generator_factory import get_phase_generator
from alr_envs.black_box.factory.trajectory_generator_factory import get_trajectory_generator
from alr_envs.black_box.raw_interface_wrapper import RawInterfaceWrapper
2022-06-30 14:08:54 +02:00
from alr_envs.utils.utils import nested_update
def make_rank(env_id: str, seed: int, rank: int = 0, return_callable=True, **kwargs):
"""
TODO: Do we need this?
Generate a callable to create a new gym environment with a given seed.
The rank is added to the seed and can be used for example when using vector environments.
E.g. [make_rank("my_env_name-v0", 123, i) for i in range(8)] creates a list of 8 environments
with seeds 123 through 130.
Hence, testing environments should be seeded with a value which is offset by the number of training environments.
Here e.g. [make_rank("my_env_name-v0", 123 + 8, i) for i in range(5)] for 5 testing environmetns
Args:
env_id: name of the environment
seed: seed for deterministic behaviour
rank: environment rank for deterministic over multiple seeds behaviour
return_callable: If True returns a callable to create the environment instead of the environment itself.
2021-04-21 10:45:34 +02:00
Returns:
"""
def f():
return make(env_id, seed + rank, **kwargs)
return f if return_callable else f()
2021-04-21 10:45:34 +02:00
2022-06-30 14:08:54 +02:00
def make(env_id, seed, **kwargs):
2022-07-07 10:47:04 +02:00
return _make(env_id, seed, **kwargs)
2022-06-30 14:08:54 +02:00
def _make(env_id: str, seed, **kwargs):
"""
Converts an env_id to an environment with the gym API.
This also works for DeepMind Control Suite interface_wrappers
for which domain name and task name are expected to be separated by "-".
Args:
env_id: gym name or env_id of the form "domain_name-task_name" for DMC tasks
**kwargs: Additional kwargs for the constructor such as pixel observations, etc.
Returns: Gym environment
"""
2022-07-07 10:47:04 +02:00
# 'dmc:domain-task'
# 'gym:name-vX'
# 'meta:name-vX'
# 'meta:bb:name-vX'
# 'hand:name-vX'
# 'name-vX'
# 'bb:name-vX'
#
# env_id.split(':')
# if 'dmc' :
2021-11-30 16:11:32 +01:00
try:
2022-07-07 10:47:04 +02:00
# This access is required to allow for nested dict updates for BB envs
spec = registry.get(env_id)
all_kwargs = deepcopy(spec.kwargs)
nested_update(all_kwargs, kwargs)
kwargs = all_kwargs
# Add seed to kwargs in case it is a predefined gym+dmc hybrid environment.
if env_id.startswith("dmc"):
kwargs.update({"seed": seed})
# Gym
env = gym.make(env_id, **kwargs)
env.seed(seed)
env.action_space.seed(seed)
env.observation_space.seed(seed)
2022-07-07 10:47:04 +02:00
except (gym.error.Error, AttributeError):
# MetaWorld env
import metaworld
if env_id in metaworld.ML1.ENV_NAMES:
env = metaworld.envs.ALL_V2_ENVIRONMENTS_GOAL_OBSERVABLE[env_id + "-goal-observable"](seed=seed, **kwargs)
2022-07-07 10:47:04 +02:00
# setting this avoids generating the same initialization after each reset
env._freeze_rand_vec = False
2022-07-07 10:47:04 +02:00
env.seeded_rand_vec = True
# Manually set spec, as metaworld environments are not registered via gym
env.unwrapped.spec = EnvSpec(env_id)
# Set Timelimit based on the maximum allowed path length of the environment
env = gym.wrappers.TimeLimit(env, max_episode_steps=env.max_path_length)
2022-07-07 10:47:04 +02:00
# env.seed(seed)
# env.action_space.seed(seed)
# env.observation_space.seed(seed)
# env.goal_space.seed(seed)
else:
# DMC
from alr_envs import make_dmc
env = make_dmc(env_id, seed=seed, **kwargs)
if not env.base_step_limit == env.spec.max_episode_steps:
raise ValueError(f"The specified 'episode_length' of {env.spec.max_episode_steps} steps for gym "
f"is different from the DMC environment specification of {env.base_step_limit} steps.")
return env
2021-04-21 10:45:34 +02:00
def _make_wrapped_env(
2022-06-29 09:37:18 +02:00
env_id: str, wrappers: Iterable[Type[gym.Wrapper]], seed=1, **kwargs
):
"""
Helper function for creating a wrapped gym environment using MPs.
2022-06-30 14:08:54 +02:00
It adds all provided wrappers to the specified environment and verifies at least one RawInterfaceWrapper is
provided to expose the interface for MPs.
Args:
env_id: name of the environment
2022-06-30 14:08:54 +02:00
wrappers: list of wrappers (at least an RawInterfaceWrapper),
seed: seed of environment
Returns: gym environment with all specified wrappers applied
"""
# _env = gym.make(env_id)
_env = make(env_id, seed, **kwargs)
2022-06-29 09:37:18 +02:00
has_black_box_wrapper = False
for w in wrappers:
2022-06-29 09:37:18 +02:00
# only wrap the environment if not BlackBoxWrapper, e.g. for vision
if issubclass(w, RawInterfaceWrapper):
has_black_box_wrapper = True
_env = w(_env)
if not has_black_box_wrapper:
raise ValueError("An RawInterfaceWrapper is required in order to leverage movement primitive environments.")
2022-05-03 19:51:54 +02:00
return _env
2022-06-30 14:08:54 +02:00
def make_bb(
env_id: str, wrappers: Iterable, black_box_kwargs: MutableMapping, traj_gen_kwargs: MutableMapping,
2022-05-03 19:51:54 +02:00
controller_kwargs: MutableMapping, phase_kwargs: MutableMapping, basis_kwargs: MutableMapping, seed=1,
2022-06-30 14:08:54 +02:00
**kwargs):
2022-05-03 19:51:54 +02:00
"""
This can also be used standalone for manually building a custom DMP environment.
Args:
2022-06-30 14:08:54 +02:00
black_box_kwargs: kwargs for the black-box wrapper
2022-06-29 09:37:18 +02:00
basis_kwargs: kwargs for the basis generator
phase_kwargs: kwargs for the phase generator
controller_kwargs: kwargs for the tracking controller
2022-05-03 19:51:54 +02:00
env_id: base_env_name,
2022-07-06 09:05:35 +02:00
wrappers: list of wrappers (at least an RawInterfaceWrapper),
2022-05-03 19:51:54 +02:00
seed: seed of environment
2022-06-29 09:37:18 +02:00
traj_gen_kwargs: dict of at least {num_dof: int, num_basis: int} for DMP
2022-05-03 19:51:54 +02:00
Returns: DMP wrapped gym env
"""
2022-06-29 09:37:18 +02:00
_verify_time_limit(traj_gen_kwargs.get("duration", None), kwargs.get("time_limit", None))
_env = _make_wrapped_env(env_id=env_id, wrappers=wrappers, seed=seed, **kwargs)
2022-06-30 14:08:54 +02:00
learn_sub_trajs = black_box_kwargs.get('learn_sub_trajectories')
do_replanning = black_box_kwargs.get('replanning_schedule')
if learn_sub_trajs and do_replanning:
raise ValueError('Cannot used sub-trajectory learning and replanning together.')
if learn_sub_trajs or do_replanning:
# add time_step observation when replanning
kwargs['wrappers'].append(TimeAwareObservation)
2022-06-29 09:37:18 +02:00
traj_gen_kwargs['action_dim'] = traj_gen_kwargs.get('action_dim', np.prod(_env.action_space.shape).item())
2022-06-30 14:08:54 +02:00
if black_box_kwargs.get('duration') is None:
black_box_kwargs['duration'] = _env.spec.max_episode_steps * _env.dt
if phase_kwargs.get('tau') is None:
phase_kwargs['tau'] = black_box_kwargs['duration']
if learn_sub_trajs is not None:
# We have to learn the length when learning sub_trajectories trajectories
phase_kwargs['learn_tau'] = True
2022-05-03 19:51:54 +02:00
phase_gen = get_phase_generator(**phase_kwargs)
basis_gen = get_basis_generator(phase_generator=phase_gen, **basis_kwargs)
controller = get_controller(**controller_kwargs)
2022-06-29 09:37:18 +02:00
traj_gen = get_trajectory_generator(basis_generator=basis_gen, **traj_gen_kwargs)
bb_env = BlackBoxWrapper(_env, trajectory_generator=traj_gen, tracking_controller=controller,
2022-06-30 14:08:54 +02:00
**black_box_kwargs)
2022-06-29 09:37:18 +02:00
return bb_env
2022-06-29 09:37:18 +02:00
def make_bb_env_helper(**kwargs):
2022-05-03 19:51:54 +02:00
"""
2022-06-29 09:37:18 +02:00
Helper function for registering a black box gym environment.
2022-05-03 19:51:54 +02:00
Args:
**kwargs: expects at least the following:
{
"name": base environment name.
2022-06-29 09:37:18 +02:00
"wrappers": list of wrappers (at least an BlackBoxWrapper is required),
"traj_gen_kwargs": {
"trajectory_generator_type": type_of_your_movement_primitive,
2022-05-03 19:51:54 +02:00
non default arguments for the movement primitive instance
...
}
"controller_kwargs": {
"controller_type": type_of_your_controller,
2022-06-29 09:37:18 +02:00
non default arguments for the tracking_controller instance
2022-05-03 19:51:54 +02:00
...
},
"basis_generator_kwargs": {
"basis_generator_type": type_of_your_basis_generator,
non default arguments for the basis generator instance
...
},
"phase_generator_kwargs": {
"phase_generator_type": type_of_your_phase_generator,
non default arguments for the phase generator instance
...
},
}
Returns: MP wrapped gym env
"""
seed = kwargs.pop("seed", None)
wrappers = kwargs.pop("wrappers")
traj_gen_kwargs = kwargs.pop("trajectory_generator_kwargs", {})
black_box_kwargs = kwargs.pop('black_box_kwargs', {})
2022-06-29 09:37:18 +02:00
contr_kwargs = kwargs.pop("controller_kwargs", {})
phase_kwargs = kwargs.pop("phase_generator_kwargs", {})
basis_kwargs = kwargs.pop("basis_generator_kwargs", {})
2021-11-15 09:10:03 +01:00
2022-06-30 14:08:54 +02:00
return make_bb(env_id=kwargs.pop("name"), wrappers=wrappers,
black_box_kwargs=black_box_kwargs,
traj_gen_kwargs=traj_gen_kwargs, controller_kwargs=contr_kwargs,
phase_kwargs=phase_kwargs,
basis_kwargs=basis_kwargs, **kwargs, seed=seed)
2021-11-15 09:10:03 +01:00
def _verify_time_limit(mp_time_limit: Union[None, float], env_time_limit: Union[None, float]):
"""
When using DMC check if a manually specified time limit matches the trajectory duration the MP receives.
Mostly, the time_limit for DMC is not specified and the default values from DMC are taken.
This check, however, can only been done after instantiating the environment.
It can be found in the BaseMP class.
Args:
2022-06-30 14:08:54 +02:00
mp_time_limit: max trajectory length of traj_gen in seconds
env_time_limit: max trajectory length of DMC environment in seconds
Returns:
"""
if mp_time_limit is not None and env_time_limit is not None:
assert mp_time_limit == env_time_limit, \
f"The specified 'time_limit' of {env_time_limit}s does not match " \
f"the duration of {mp_time_limit}s for the MP."
def _verify_dof(base_env: gym.Env, dof: int):
action_shape = np.prod(base_env.action_space.shape)
assert dof == action_shape, \
f"The specified degrees of freedom ('num_dof') {dof} do not match " \
f"the action space of {action_shape} the base environments"