updated for gym 0.25.1

This commit is contained in:
Fabian 2022-08-18 09:04:38 +02:00
parent a5cac2dd0d
commit bdea3badff
10 changed files with 93 additions and 110 deletions

View File

@ -86,8 +86,8 @@ class BlackBoxWrapper(gym.ObservationWrapper):
velocity = get_numpy(self.traj_gen.get_traj_vel()) velocity = get_numpy(self.traj_gen.get_traj_vel())
# Remove first element of trajectory as this is the current position and velocity # Remove first element of trajectory as this is the current position and velocity
trajectory = trajectory[1:] # trajectory = trajectory[1:]
velocity = velocity[1:] # velocity = velocity[1:]
return trajectory, velocity return trajectory, velocity

View File

@ -1,4 +1,3 @@
from abc import abstractmethod
from typing import Union, Tuple from typing import Union, Tuple
import gym import gym
@ -23,7 +22,6 @@ class RawInterfaceWrapper(gym.Wrapper):
return np.ones(self.env.observation_space.shape[0], dtype=bool) return np.ones(self.env.observation_space.shape[0], dtype=bool)
@property @property
@abstractmethod
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
""" """
Returns the current position of the action/control dimension. Returns the current position of the action/control dimension.
@ -32,10 +30,9 @@ class RawInterfaceWrapper(gym.Wrapper):
it should, however, be implemented regardless. it should, however, be implemented regardless.
E.g. The joint positions that are directly or indirectly controlled by the action. E.g. The joint positions that are directly or indirectly controlled by the action.
""" """
raise NotImplementedError() raise NotImplementedError
@property @property
@abstractmethod
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
""" """
Returns the current velocity of the action/control dimension. Returns the current velocity of the action/control dimension.
@ -44,7 +41,7 @@ class RawInterfaceWrapper(gym.Wrapper):
it should, however, be implemented regardless. it should, however, be implemented regardless.
E.g. The joint velocities that are directly or indirectly controlled by the action. E.g. The joint velocities that are directly or indirectly controlled by the action.
""" """
raise NotImplementedError() raise NotImplementedError
@property @property
def dt(self) -> float: def dt(self) -> float:

View File

@ -126,7 +126,7 @@ for _dims in [5, 7]:
register( register(
id=f'Reacher{_dims}dSparse-v0', id=f'Reacher{_dims}dSparse-v0',
entry_point='fancy_gym.envs.mujoco:ReacherEnv', entry_point='fancy_gym.envs.mujoco:ReacherEnv',
max_episode_steps=200, max_episode_steps=MAX_EPISODE_STEPS_REACHER,
kwargs={ kwargs={
"sparse": True, "sparse": True,
'reward_weight': 200, 'reward_weight': 200,

View File

@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from typing import Union, Tuple, Optional from typing import Union, Tuple, Optional
import gym import gym
@ -10,7 +9,7 @@ from gym.utils import seeding
from fancy_gym.envs.classic_control.utils import intersect from fancy_gym.envs.classic_control.utils import intersect
class BaseReacherEnv(gym.Env, ABC): class BaseReacherEnv(gym.Env):
""" """
Base class for all reaching environments. Base class for all reaching environments.
""" """
@ -87,13 +86,6 @@ class BaseReacherEnv(gym.Env, ABC):
return self._get_obs().copy() return self._get_obs().copy()
@abstractmethod
def step(self, action: np.ndarray):
"""
A single step with action in angular velocity space
"""
raise NotImplementedError
def _update_joints(self): def _update_joints(self):
""" """
update joints to get new end-effector position. The other links are only required for rendering. update joints to get new end-effector position. The other links are only required for rendering.
@ -120,27 +112,24 @@ class BaseReacherEnv(gym.Env, ABC):
return True return True
return False return False
@abstractmethod
def _get_reward(self, action: np.ndarray) -> (float, dict): def _get_reward(self, action: np.ndarray) -> (float, dict):
pass raise NotImplementedError
@abstractmethod
def _get_obs(self) -> np.ndarray: def _get_obs(self) -> np.ndarray:
pass raise NotImplementedError
@abstractmethod
def _check_collisions(self) -> bool: def _check_collisions(self) -> bool:
pass raise NotImplementedError
@abstractmethod
def _terminate(self, info) -> bool: def _terminate(self, info) -> bool:
return False raise NotImplementedError
def seed(self, seed=None): def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed) self.np_random, seed = seeding.np_random(seed)
return [seed] return [seed]
def close(self): def close(self):
super(BaseReacherEnv, self).close()
del self.fig del self.fig
@property @property

View File

@ -1,12 +1,10 @@
from abc import ABC
import numpy as np import numpy as np
from gym import spaces from gym import spaces
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherDirectEnv(BaseReacherEnv, ABC): class BaseReacherDirectEnv(BaseReacherEnv):
""" """
Base class for directly controlled reaching environments Base class for directly controlled reaching environments
""" """

View File

@ -1,12 +1,10 @@
from abc import ABC
import numpy as np import numpy as np
from gym import spaces from gym import spaces
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
class BaseReacherTorqueEnv(BaseReacherEnv, ABC): class BaseReacherTorqueEnv(BaseReacherEnv):
""" """
Base class for torque controlled reaching environments Base class for torque controlled reaching environments
""" """

View File

@ -98,7 +98,7 @@ class HopperJumpEnv(HopperEnv):
if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP): if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP):
healthy_reward = self.healthy_reward healthy_reward = self.healthy_reward
distance_reward = -goal_dist * self._dist_weight distance_reward = -goal_dist * self._dist_weight
height_reward = (self.max_height if self.sparse else self.get_body_com("torso")[2]) * self._height_weight height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
contact_reward = -(self.contact_dist or 5) * self._contact_weight contact_reward = -(self.contact_dist or 5) * self._contact_weight
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward) rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
@ -123,7 +123,7 @@ class HopperJumpEnv(HopperEnv):
return np.concatenate((super(HopperJumpEnv, self)._get_obs(), goal_dist.copy(), self.goal[:1])) return np.concatenate((super(HopperJumpEnv, self)._get_obs(), goal_dist.copy(), self.goal[:1]))
def reset_model(self): def reset_model(self):
super(HopperJumpEnv, self).reset_model() # super(HopperJumpEnv, self).reset_model()
# self.goal = self.np_random.uniform(0.3, 1.35, 1)[0] # self.goal = self.np_random.uniform(0.3, 1.35, 1)[0]
self.goal = np.concatenate([self.np_random.uniform(0.3, 1.35, 1), np.zeros(2, )]) self.goal = np.concatenate([self.np_random.uniform(0.3, 1.35, 1), np.zeros(2, )])
@ -176,76 +176,76 @@ class HopperJumpEnv(HopperEnv):
return True return True
return False return False
# TODO is that needed? if so test it # # TODO is that needed? if so test it
class HopperJumpStepEnv(HopperJumpEnv): # class HopperJumpStepEnv(HopperJumpEnv):
#
def __init__(self, # def __init__(self,
xml_file='hopper_jump.xml', # xml_file='hopper_jump.xml',
forward_reward_weight=1.0, # forward_reward_weight=1.0,
ctrl_cost_weight=1e-3, # ctrl_cost_weight=1e-3,
healthy_reward=1.0, # healthy_reward=1.0,
height_weight=3, # height_weight=3,
dist_weight=3, # dist_weight=3,
terminate_when_unhealthy=False, # terminate_when_unhealthy=False,
healthy_state_range=(-100.0, 100.0), # healthy_state_range=(-100.0, 100.0),
healthy_z_range=(0.5, float('inf')), # healthy_z_range=(0.5, float('inf')),
healthy_angle_range=(-float('inf'), float('inf')), # healthy_angle_range=(-float('inf'), float('inf')),
reset_noise_scale=5e-3, # reset_noise_scale=5e-3,
exclude_current_positions_from_observation=False # exclude_current_positions_from_observation=False
): # ):
#
self._height_weight = height_weight # self._height_weight = height_weight
self._dist_weight = dist_weight # self._dist_weight = dist_weight
super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy, # super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale, # healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
exclude_current_positions_from_observation) # exclude_current_positions_from_observation)
#
def step(self, action): # def step(self, action):
self._steps += 1 # self._steps += 1
#
self.do_simulation(action, self.frame_skip) # self.do_simulation(action, self.frame_skip)
#
height_after = self.get_body_com("torso")[2] # height_after = self.get_body_com("torso")[2]
site_pos_after = self.data.site('foot_site').xpos.copy() # site_pos_after = self.data.site('foot_site').xpos.copy()
self.max_height = max(height_after, self.max_height) # self.max_height = max(height_after, self.max_height)
#
ctrl_cost = self.control_cost(action) # ctrl_cost = self.control_cost(action)
healthy_reward = self.healthy_reward # healthy_reward = self.healthy_reward
height_reward = self._height_weight * height_after # height_reward = self._height_weight * height_after
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0])) # goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
goal_dist_reward = -self._dist_weight * goal_dist # goal_dist_reward = -self._dist_weight * goal_dist
dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward) # dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
#
rewards = dist_reward + healthy_reward # rewards = dist_reward + healthy_reward
costs = ctrl_cost # costs = ctrl_cost
done = False # done = False
#
# This is only for logging the distance to goal when first having the contact # # This is only for logging the distance to goal when first having the contact
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False # has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
#
if not self.init_floor_contact: # if not self.init_floor_contact:
self.init_floor_contact = has_floor_contact # self.init_floor_contact = has_floor_contact
if self.init_floor_contact and not self.has_left_floor: # if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not has_floor_contact # self.has_left_floor = not has_floor_contact
if not self.contact_with_floor and self.has_left_floor: # if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = has_floor_contact # self.contact_with_floor = has_floor_contact
#
if self.contact_dist is None and self.contact_with_floor: # if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = goal_dist # self.contact_dist = goal_dist
#
############################################################## # ##############################################################
#
observation = self._get_obs() # observation = self._get_obs()
reward = rewards - costs # reward = rewards - costs
info = { # info = {
'height': height_after, # 'height': height_after,
'x_pos': site_pos_after, # 'x_pos': site_pos_after,
'max_height': copy.copy(self.max_height), # 'max_height': copy.copy(self.max_height),
'goal': copy.copy(self.goal), # 'goal': copy.copy(self.goal),
'goal_dist': goal_dist, # 'goal_dist': goal_dist,
'height_rew': height_reward, # 'height_rew': height_reward,
'healthy_reward': healthy_reward, # 'healthy_reward': healthy_reward,
'healthy': copy.copy(self.is_healthy), # 'healthy': copy.copy(self.is_healthy),
'contact_dist': copy.copy(self.contact_dist) or 0 # 'contact_dist': copy.copy(self.contact_dist) or 0
} # }
return observation, reward, done, info # return observation, reward, done, info

View File

@ -14,7 +14,7 @@ class MPWrapper(RawInterfaceWrapper):
[False] * (2 + int(not self.exclude_current_positions_from_observation)), # position [False] * (2 + int(not self.exclude_current_positions_from_observation)), # position
[True] * 3, # set to true if randomize initial pos [True] * 3, # set to true if randomize initial pos
[False] * 6, # velocity [False] * 6, # velocity
[True] * 3, # goal distance [False] * 3, # goal distance
[True] # goal [True] # goal
]) ])

View File

@ -1,4 +1,3 @@
from abc import ABC
from typing import Tuple, Union from typing import Tuple, Union
import numpy as np import numpy as np
@ -6,7 +5,7 @@ import numpy as np
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
class BaseMetaworldMPWrapper(RawInterfaceWrapper, ABC): class BaseMetaworldMPWrapper(RawInterfaceWrapper):
@property @property
def current_pos(self) -> Union[float, int, np.ndarray]: def current_pos(self) -> Union[float, int, np.ndarray]:
r_close = self.env.data.get_joint_qpos("r_close") r_close = self.env.data.get_joint_qpos("r_close")

View File

@ -6,7 +6,9 @@ from setuptools import setup, find_packages
extras = { extras = {
"dmc": ["dm_control>=1.0.1"], "dmc": ["dm_control>=1.0.1"],
"metaworld": ["metaworld @ git+https://github.com/rlworkgroup/metaworld.git@master#egg=metaworld", "metaworld": ["metaworld @ git+https://github.com/rlworkgroup/metaworld.git@master#egg=metaworld",
'mujoco-py<2.2,>=2.1'], 'mujoco-py<2.2,>=2.1',
'scipy'
],
} }
# All dependencies # All dependencies