Merge remote-tracking branch 'origin/master'

Conflicts:
	fancy_gym/black_box/black_box_wrapper.py
This commit is contained in:
Fabian 2022-09-26 09:52:45 +02:00
commit 932c5d6aea
12 changed files with 106 additions and 117 deletions

View File

@ -1,10 +1,10 @@
# Fancy Gym
`fancy_gym` offers a large variety of reinforcement learning environments under the unifying interface
of [OpenAI gym](https://gym.openai.com/). We provide support (under the OpenAI gym interface) for the benchmark suites
of [OpenAI gym](https://gymlibrary.dev/). We provide support (under the OpenAI gym interface) for the benchmark suites
[DeepMind Control](https://deepmind.com/research/publications/2020/dm-control-Software-and-Tasks-for-Continuous-Control)
(DMC) and [Metaworld](https://meta-world.github.io/). If those are not sufficient and you want to create your own custom
gym environments, use [this guide](https://www.gymlibrary.ml/content/environment_creation/). We highly appreciate it, if
gym environments, use [this guide](https://www.gymlibrary.dev/content/environment_creation/). We highly appreciate it, if
you would then submit a PR for this environment to become part of `fancy_gym`.
In comparison to existing libraries, we additionally support to control agents with movement primitives, such as Dynamic
Movement Primitives (DMPs) and Probabilistic Movement Primitives (ProMP).

View File

@ -67,23 +67,24 @@ class BlackBoxWrapper(gym.ObservationWrapper):
def observation(self, observation):
# return context space if we are
obs = observation[self.env.context_mask] if self.return_context_observation else observation
if self.return_context_observation:
observation = observation[self.env.context_mask]
# cast dtype because metaworld returns incorrect that throws gym error
return obs.astype(self.observation_space.dtype)
return observation.astype(self.observation_space.dtype)
def get_trajectory(self, action: np.ndarray) -> Tuple:
clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
self.traj_gen.set_params(clipped_params)
# TODO: is this correct for replanning? Do we need to adjust anything here?
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
# TODO we could think about initializing with the previous desired value in order to have a smooth transition
# at least from the planning point of view.
self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
self.traj_gen.set_duration(None if self.learn_sub_trajectories else self.duration, self.dt)
duration = None if self.learn_sub_trajectories else self.duration
self.traj_gen.set_duration(duration, self.dt)
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
trajectory = get_numpy(self.traj_gen.get_traj_pos())
velocity = get_numpy(self.traj_gen.get_traj_vel())
print(len(trajectory))
if self.do_replanning:
# Remove first part of trajectory as this is already over
trajectory = trajectory[self.current_traj_steps:]

View File

@ -1,4 +1,3 @@
from abc import abstractmethod
from typing import Union, Tuple
import gym
@ -23,7 +22,6 @@ class RawInterfaceWrapper(gym.Wrapper):
return np.ones(self.env.observation_space.shape[0], dtype=bool)
@property
@abstractmethod
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
"""
Returns the current position of the action/control dimension.
@ -32,10 +30,9 @@ class RawInterfaceWrapper(gym.Wrapper):
it should, however, be implemented regardless.
E.g. The joint positions that are directly or indirectly controlled by the action.
"""
raise NotImplementedError()
raise NotImplementedError
@property
@abstractmethod
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
"""
Returns the current velocity of the action/control dimension.
@ -44,7 +41,7 @@ class RawInterfaceWrapper(gym.Wrapper):
it should, however, be implemented regardless.
E.g. The joint velocities that are directly or indirectly controlled by the action.
"""
raise NotImplementedError()
raise NotImplementedError
@property
def dt(self) -> float:

View File

@ -126,7 +126,7 @@ for _dims in [5, 7]:
register(
id=f'Reacher{_dims}dSparse-v0',
entry_point='fancy_gym.envs.mujoco:ReacherEnv',
max_episode_steps=200,
max_episode_steps=MAX_EPISODE_STEPS_REACHER,
kwargs={
"sparse": True,
'reward_weight': 200,

View File

@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from typing import Union, Tuple, Optional
import gym
@ -10,7 +9,7 @@ from gym.utils import seeding
from fancy_gym.envs.classic_control.utils import intersect
class BaseReacherEnv(gym.Env, ABC):
class BaseReacherEnv(gym.Env):
"""
Base class for all reaching environments.
"""
@ -87,13 +86,6 @@ class BaseReacherEnv(gym.Env, ABC):
return self._get_obs().copy()
@abstractmethod
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
raise NotImplementedError
def _update_joints(self):
"""
update joints to get new end-effector position. The other links are only required for rendering.
@ -120,27 +112,24 @@ class BaseReacherEnv(gym.Env, ABC):
return True
return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict):
pass
raise NotImplementedError
@abstractmethod
def _get_obs(self) -> np.ndarray:
pass
raise NotImplementedError
@abstractmethod
def _check_collisions(self) -> bool:
pass
raise NotImplementedError
@abstractmethod
def _terminate(self, info) -> bool:
return False
raise NotImplementedError
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
super(BaseReacherEnv, self).close()
del self.fig
@property

View File

@ -1,12 +1,10 @@
from abc import ABC
import numpy as np
from gym import spaces
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherDirectEnv(BaseReacherEnv, ABC):
class BaseReacherDirectEnv(BaseReacherEnv):
"""
Base class for directly controlled reaching environments
"""

View File

@ -1,12 +1,10 @@
from abc import ABC
import numpy as np
from gym import spaces
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherTorqueEnv(BaseReacherEnv, ABC):
class BaseReacherTorqueEnv(BaseReacherEnv):
"""
Base class for torque controlled reaching environments
"""

View File

@ -98,7 +98,7 @@ class HopperJumpEnv(HopperEnv):
if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP):
healthy_reward = self.healthy_reward
distance_reward = -goal_dist * self._dist_weight
height_reward = (self.max_height if self.sparse else self.get_body_com("torso")[2]) * self._height_weight
height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
contact_reward = -(self.contact_dist or 5) * self._contact_weight
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
@ -123,7 +123,7 @@ class HopperJumpEnv(HopperEnv):
return np.concatenate((super(HopperJumpEnv, self)._get_obs(), goal_dist.copy(), self.goal[:1]))
def reset_model(self):
super(HopperJumpEnv, self).reset_model()
# super(HopperJumpEnv, self).reset_model()
# self.goal = self.np_random.uniform(0.3, 1.35, 1)[0]
self.goal = np.concatenate([self.np_random.uniform(0.3, 1.35, 1), np.zeros(2, )])
@ -176,76 +176,76 @@ class HopperJumpEnv(HopperEnv):
return True
return False
# TODO is that needed? if so test it
class HopperJumpStepEnv(HopperJumpEnv):
def __init__(self,
xml_file='hopper_jump.xml',
forward_reward_weight=1.0,
ctrl_cost_weight=1e-3,
healthy_reward=1.0,
height_weight=3,
dist_weight=3,
terminate_when_unhealthy=False,
healthy_state_range=(-100.0, 100.0),
healthy_z_range=(0.5, float('inf')),
healthy_angle_range=(-float('inf'), float('inf')),
reset_noise_scale=5e-3,
exclude_current_positions_from_observation=False
):
self._height_weight = height_weight
self._dist_weight = dist_weight
super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
exclude_current_positions_from_observation)
def step(self, action):
self._steps += 1
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
site_pos_after = self.data.site('foot_site').xpos.copy()
self.max_height = max(height_after, self.max_height)
ctrl_cost = self.control_cost(action)
healthy_reward = self.healthy_reward
height_reward = self._height_weight * height_after
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
goal_dist_reward = -self._dist_weight * goal_dist
dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
rewards = dist_reward + healthy_reward
costs = ctrl_cost
done = False
# This is only for logging the distance to goal when first having the contact
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = has_floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not has_floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = has_floor_contact
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = goal_dist
##############################################################
observation = self._get_obs()
reward = rewards - costs
info = {
'height': height_after,
'x_pos': site_pos_after,
'max_height': copy.copy(self.max_height),
'goal': copy.copy(self.goal),
'goal_dist': goal_dist,
'height_rew': height_reward,
'healthy_reward': healthy_reward,
'healthy': copy.copy(self.is_healthy),
'contact_dist': copy.copy(self.contact_dist) or 0
}
return observation, reward, done, info
# # TODO is that needed? if so test it
# class HopperJumpStepEnv(HopperJumpEnv):
#
# def __init__(self,
# xml_file='hopper_jump.xml',
# forward_reward_weight=1.0,
# ctrl_cost_weight=1e-3,
# healthy_reward=1.0,
# height_weight=3,
# dist_weight=3,
# terminate_when_unhealthy=False,
# healthy_state_range=(-100.0, 100.0),
# healthy_z_range=(0.5, float('inf')),
# healthy_angle_range=(-float('inf'), float('inf')),
# reset_noise_scale=5e-3,
# exclude_current_positions_from_observation=False
# ):
#
# self._height_weight = height_weight
# self._dist_weight = dist_weight
# super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
# healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
# exclude_current_positions_from_observation)
#
# def step(self, action):
# self._steps += 1
#
# self.do_simulation(action, self.frame_skip)
#
# height_after = self.get_body_com("torso")[2]
# site_pos_after = self.data.site('foot_site').xpos.copy()
# self.max_height = max(height_after, self.max_height)
#
# ctrl_cost = self.control_cost(action)
# healthy_reward = self.healthy_reward
# height_reward = self._height_weight * height_after
# goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
# goal_dist_reward = -self._dist_weight * goal_dist
# dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
#
# rewards = dist_reward + healthy_reward
# costs = ctrl_cost
# done = False
#
# # This is only for logging the distance to goal when first having the contact
# has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
#
# if not self.init_floor_contact:
# self.init_floor_contact = has_floor_contact
# if self.init_floor_contact and not self.has_left_floor:
# self.has_left_floor = not has_floor_contact
# if not self.contact_with_floor and self.has_left_floor:
# self.contact_with_floor = has_floor_contact
#
# if self.contact_dist is None and self.contact_with_floor:
# self.contact_dist = goal_dist
#
# ##############################################################
#
# observation = self._get_obs()
# reward = rewards - costs
# info = {
# 'height': height_after,
# 'x_pos': site_pos_after,
# 'max_height': copy.copy(self.max_height),
# 'goal': copy.copy(self.goal),
# 'goal_dist': goal_dist,
# 'height_rew': height_reward,
# 'healthy_reward': healthy_reward,
# 'healthy': copy.copy(self.is_healthy),
# 'contact_dist': copy.copy(self.contact_dist) or 0
# }
# return observation, reward, done, info

View File

@ -14,7 +14,7 @@ class MPWrapper(RawInterfaceWrapper):
[False] * (2 + int(not self.exclude_current_positions_from_observation)), # position
[True] * 3, # set to true if randomize initial pos
[False] * 6, # velocity
[True] * 3, # goal distance
[False] * 3, # goal distance
[True] # goal
])

View File

@ -1,4 +1,3 @@
from abc import ABC
from typing import Tuple, Union
import numpy as np
@ -6,7 +5,7 @@ import numpy as np
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
class BaseMetaworldMPWrapper(RawInterfaceWrapper, ABC):
class BaseMetaworldMPWrapper(RawInterfaceWrapper):
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
r_close = self.env.data.get_joint_qpos("r_close")

View File

@ -1,3 +1,4 @@
import logging
import re
import uuid
from collections.abc import MutableMapping
@ -310,7 +311,11 @@ def make_gym(env_id, seed, **kwargs):
"""
# Getting the existing keywords to allow for nested dict updates for BB envs
# gym only allows for non nested updates.
try:
all_kwargs = deepcopy(registry.get(env_id).kwargs)
except AttributeError as e:
logging.error(f'The gym environment with id {env_id} could not been found.')
raise e
nested_update(all_kwargs, kwargs)
kwargs = all_kwargs

View File

@ -6,7 +6,9 @@ from setuptools import setup, find_packages
extras = {
"dmc": ["dm_control>=1.0.1"],
"metaworld": ["metaworld @ git+https://github.com/rlworkgroup/metaworld.git@master#egg=metaworld",
'mujoco-py<2.2,>=2.1'],
'mujoco-py<2.2,>=2.1',
'scipy'
],
}
# All dependencies