updated custom tasks to new api

This commit is contained in:
Fabian 2023-01-12 17:22:45 +01:00
parent fbe3ef4a4b
commit ed724046f3
23 changed files with 249 additions and 363 deletions

View File

@ -1,8 +1,9 @@
from typing import Tuple, Optional from typing import Tuple, Optional, Dict, Any
import gym import gymnasium as gym
import numpy as np import numpy as np
from gym import spaces from gymnasium import spaces
from gymnasium.core import ObsType
from mp_pytorch.mp.mp_interfaces import MPInterface from mp_pytorch.mp.mp_interfaces import MPInterface
from fancy_gym.black_box.controller.base_controller import BaseController from fancy_gym.black_box.controller.base_controller import BaseController
@ -140,7 +141,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
for t, (pos, vel) in enumerate(zip(trajectory, velocity)): for t, (pos, vel) in enumerate(zip(trajectory, velocity)):
step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel) step_action = self.tracking_controller.get_action(pos, vel, self.current_pos, self.current_vel)
c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high) c_action = np.clip(step_action, self.env.action_space.low, self.env.action_space.high)
obs, c_reward, done, info = self.env.step(c_action) obs, c_reward, terminated, truncated, info = self.env.step(c_action)
rewards[t] = c_reward rewards[t] = c_reward
if self.verbose >= 2: if self.verbose >= 2:
@ -155,7 +156,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
if self.render_kwargs: if self.render_kwargs:
self.env.render(**self.render_kwargs) self.env.render(**self.render_kwargs)
if done or self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action, if terminated or truncated or self.replanning_schedule(self.current_pos, self.current_vel, obs, c_action,
t + 1 + self.current_traj_steps): t + 1 + self.current_traj_steps):
break break
@ -171,13 +172,14 @@ class BlackBoxWrapper(gym.ObservationWrapper):
infos['trajectory_length'] = t + 1 infos['trajectory_length'] = t + 1
trajectory_return = self.reward_aggregation(rewards[:t + 1]) trajectory_return = self.reward_aggregation(rewards[:t + 1])
return self.observation(obs), trajectory_return, done, infos return self.observation(obs), trajectory_return, terminated, truncated, infos
def render(self, **kwargs): def render(self, **kwargs):
"""Only set render options here, such that they can be used during the rollout. """Only set render options here, such that they can be used during the rollout.
This only needs to be called once""" This only needs to be called once"""
self.render_kwargs = kwargs self.render_kwargs = kwargs
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
-> Tuple[ObsType, Dict[str, Any]]:
self.current_traj_steps = 0 self.current_traj_steps = 0
return super(BlackBoxWrapper, self).reset() return super(BlackBoxWrapper, self).reset(seed=seed, options=options)

View File

@ -1,6 +1,6 @@
from typing import Union, Tuple from typing import Union, Tuple
import gym import gymnasium as gym
import numpy as np import numpy as np
from mp_pytorch.mp.mp_interfaces import MPInterface from mp_pytorch.mp.mp_interfaces import MPInterface

View File

@ -1,7 +1,7 @@
from copy import deepcopy from copy import deepcopy
import numpy as np import numpy as np
from gym import register from gymnasium import register
from . import classic_control, mujoco from . import classic_control, mujoco
from .classic_control.hole_reacher.hole_reacher import HoleReacherEnv from .classic_control.hole_reacher.hole_reacher import HoleReacherEnv

View File

@ -1,10 +1,10 @@
from typing import Union, Tuple, Optional from typing import Union, Tuple, Optional, Any, Dict
import gym import gymnasium as gym
import numpy as np import numpy as np
from gym import spaces from gymnasium import spaces
from gym.core import ObsType from gymnasium.core import ObsType
from gym.utils import seeding from gymnasium.utils import seeding
from fancy_gym.envs.classic_control.utils import intersect from fancy_gym.envs.classic_control.utils import intersect
@ -69,10 +69,14 @@ class BaseReacherEnv(gym.Env):
def current_vel(self): def current_vel(self):
return self._angle_velocity.copy() return self._angle_velocity.copy()
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]: -> Tuple[ObsType, Dict[str, Any]]:
# Sample only orientation of first link, i.e. the arm is always straight. # Sample only orientation of first link, i.e. the arm is always straight.
if self.random_start: try:
random_start = options.get('random_start', self.random_start)
except AttributeError:
random_start = self.random_start
if random_start:
first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4) first_joint = self.np_random.uniform(np.pi / 4, 3 * np.pi / 4)
self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)]) self._joint_angles = np.hstack([[first_joint], np.zeros(self.n_links - 1)])
self._start_pos = self._joint_angles.copy() self._start_pos = self._joint_angles.copy()
@ -84,7 +88,7 @@ class BaseReacherEnv(gym.Env):
self._update_joints() self._update_joints()
self._steps = 0 self._steps = 0
return self._get_obs().copy() return self._get_obs().copy(), {}
def _update_joints(self): def _update_joints(self):
""" """

View File

@ -1,5 +1,5 @@
import numpy as np import numpy as np
from gym import spaces from gymnasium import spaces
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
@ -32,6 +32,7 @@ class BaseReacherDirectEnv(BaseReacherEnv):
reward, info = self._get_reward(action) reward, info = self._get_reward(action)
self._steps += 1 self._steps += 1
done = self._terminate(info) terminated = self._terminate(info)
truncated = False
return self._get_obs().copy(), reward, done, info return self._get_obs().copy(), reward, terminated, truncated, info

View File

@ -1,5 +1,5 @@
import numpy as np import numpy as np
from gym import spaces from gymnasium import spaces
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
@ -31,6 +31,7 @@ class BaseReacherTorqueEnv(BaseReacherEnv):
reward, info = self._get_reward(action) reward, info = self._get_reward(action)
self._steps += 1 self._steps += 1
done = False terminated = False
truncated = False
return self._get_obs().copy(), reward, done, info return self._get_obs().copy(), reward, terminated, truncated, info

View File

@ -1,9 +1,10 @@
from typing import Union, Optional, Tuple from typing import Union, Optional, Tuple, Any, Dict
import gym import gymnasium as gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym.core import ObsType from gymnasium import spaces
from gymnasium.core import ObsType
from matplotlib import patches from matplotlib import patches
from fancy_gym.envs.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
@ -40,7 +41,7 @@ class HoleReacherEnv(BaseReacherDirectEnv):
[np.inf] # env steps, because reward start after n steps TODO: Maybe [np.inf] # env steps, because reward start after n steps TODO: Maybe
]) ])
# self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) # self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape)
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
if rew_fct == "simple": if rew_fct == "simple":
from fancy_gym.envs.classic_control.hole_reacher.hr_simple_reward import HolereacherReward from fancy_gym.envs.classic_control.hole_reacher.hr_simple_reward import HolereacherReward
@ -54,8 +55,8 @@ class HoleReacherEnv(BaseReacherDirectEnv):
else: else:
raise ValueError("Unknown reward function {}".format(rew_fct)) raise ValueError("Unknown reward function {}".format(rew_fct))
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]: -> Tuple[ObsType, Dict[str, Any]]:
self._generate_hole() self._generate_hole()
self._set_patches() self._set_patches()
self.reward_function.reset() self.reward_function.reset()
@ -225,14 +226,4 @@ class HoleReacherEnv(BaseReacherDirectEnv):
self.fig.gca().add_patch(hole_floor) self.fig.gca().add_patch(hole_floor)
if __name__ == "__main__":
env = HoleReacherEnv(5)
env.reset()
for i in range(10000):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
env.render()
if done:
env.reset()

View File

@ -1,9 +1,9 @@
from typing import Iterable, Union, Optional, Tuple from typing import Iterable, Union, Optional, Tuple, Any, Dict
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym import spaces from gymnasium import spaces
from gym.core import ObsType from gymnasium.core import ObsType
from fancy_gym.envs.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher_torque import BaseReacherTorqueEnv
@ -42,11 +42,10 @@ class SimpleReacherEnv(BaseReacherTorqueEnv):
# def start_pos(self): # def start_pos(self):
# return self._start_pos # return self._start_pos
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]: -> Tuple[ObsType, Dict[str, Any]]:
self._generate_goal() self._generate_goal()
return super().reset(seed=seed, options=options)
return super().reset()
def _get_reward(self, action: np.ndarray): def _get_reward(self, action: np.ndarray):
diff = self.end_effector - self._goal diff = self.end_effector - self._goal
@ -128,14 +127,3 @@ class SimpleReacherEnv(BaseReacherTorqueEnv):
self.fig.canvas.draw() self.fig.canvas.draw()
self.fig.canvas.flush_events() self.fig.canvas.flush_events()
if __name__ == "__main__":
env = SimpleReacherEnv(5)
env.reset()
for i in range(200):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
env.render()
if done:
break

View File

@ -1,9 +1,10 @@
from typing import Iterable, Union, Tuple, Optional from typing import Iterable, Union, Tuple, Optional, Any, Dict
import gym import gymnasium as gym
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from gym.core import ObsType from gymnasium import spaces
from gymnasium.core import ObsType
from fancy_gym.envs.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv from fancy_gym.envs.classic_control.base_reacher.base_reacher_direct import BaseReacherDirectEnv
@ -34,16 +35,16 @@ class ViaPointReacherEnv(BaseReacherDirectEnv):
[np.inf] * 2, # x-y coordinates of target distance [np.inf] * 2, # x-y coordinates of target distance
[np.inf] # env steps, because reward start after n steps [np.inf] # env steps, because reward start after n steps
]) ])
self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
# @property # @property
# def start_pos(self): # def start_pos(self):
# return self._start_pos # return self._start_pos
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]: -> Tuple[ObsType, Dict[str, Any]]:
self._generate_goal() self._generate_goal()
return super().reset() return super().reset(seed=seed, options=options)
def _generate_goal(self): def _generate_goal(self):
# TODO: Maybe improve this later, this can yield quite a lot of invalid settings # TODO: Maybe improve this later, this can yield quite a lot of invalid settings
@ -185,14 +186,3 @@ class ViaPointReacherEnv(BaseReacherDirectEnv):
plt.pause(0.01) plt.pause(0.01)
if __name__ == "__main__":
env = ViaPointReacherEnv(5)
env.reset()
for i in range(10000):
ac = env.action_space.sample()
obs, rew, done, info = env.step(ac)
env.render()
if done:
env.reset()

View File

@ -1,8 +1,8 @@
from typing import Tuple, Union, Optional from typing import Tuple, Union, Optional, Any, Dict
import numpy as np import numpy as np
from gym.core import ObsType from gymnasium.core import ObsType
from gym.envs.mujoco.ant_v4 import AntEnv from gymnasium.envs.mujoco.ant_v4 import AntEnv
MAX_EPISODE_STEPS_ANTJUMP = 200 MAX_EPISODE_STEPS_ANTJUMP = 200
@ -61,9 +61,10 @@ class AntJumpEnv(AntEnv):
costs = ctrl_cost + contact_cost costs = ctrl_cost + contact_cost
done = bool(height < 0.3) # fall over -> is the 0.3 value from healthy_z_range? TODO change 0.3 to the value of healthy z angle terminated = bool(
height < 0.3) # fall over -> is the 0.3 value from healthy_z_range? TODO change 0.3 to the value of healthy z angle
if self.current_step == MAX_EPISODE_STEPS_ANTJUMP or done: if self.current_step == MAX_EPISODE_STEPS_ANTJUMP or terminated:
# -10 for scaling the value of the distance between the max_height and the goal height; only used when context is enabled # -10 for scaling the value of the distance between the max_height and the goal height; only used when context is enabled
# height_reward = -10 * (np.linalg.norm(self.max_height - self.goal)) # height_reward = -10 * (np.linalg.norm(self.max_height - self.goal))
height_reward = -10 * np.linalg.norm(self.max_height - self.goal) height_reward = -10 * np.linalg.norm(self.max_height - self.goal)
@ -80,19 +81,20 @@ class AntJumpEnv(AntEnv):
'max_height': self.max_height, 'max_height': self.max_height,
'goal': self.goal 'goal': self.goal
} }
truncated = False
return obs, reward, done, info return obs, reward, terminated, truncated, info
def _get_obs(self): def _get_obs(self):
return np.append(super()._get_obs(), self.goal) return np.append(super()._get_obs(), self.goal)
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]: -> Tuple[ObsType, Dict[str, Any]]:
self.current_step = 0 self.current_step = 0
self.max_height = 0 self.max_height = 0
# goal heights from 1.0 to 2.5; can be increased, but didnt work well with CMORE # goal heights from 1.0 to 2.5; can be increased, but didnt work well with CMORE
self.goal = self.np_random.uniform(1.0, 2.5, 1) self.goal = self.np_random.uniform(1.0, 2.5, 1)
return super().reset() return super().reset(seed=seed, options=options)
# reset_model had to be implemented in every env to make it deterministic # reset_model had to be implemented in every env to make it deterministic
def reset_model(self): def reset_model(self):

View File

@ -1,9 +1,10 @@
import os import os
from typing import Optional from typing import Optional, Any, Dict, Tuple
import numpy as np import numpy as np
from gym import utils from gymnasium import utils
from gym.envs.mujoco import MujocoEnv from gymnasium.core import ObsType
from gymnasium.envs.mujoco import MujocoEnv
MAX_EPISODE_STEPS_BEERPONG = 300 MAX_EPISODE_STEPS_BEERPONG = 300
FIXED_RELEASE_STEP = 62 # empirically evaluated for frame_skip=2! FIXED_RELEASE_STEP = 62 # empirically evaluated for frame_skip=2!
@ -30,7 +31,7 @@ CUP_COLLISION_OBJ = ["cup_geom_table3", "cup_geom_table4", "cup_geom_table5", "c
class BeerPongEnv(MujocoEnv, utils.EzPickle): class BeerPongEnv(MujocoEnv, utils.EzPickle):
def __init__(self): def __init__(self, **kwargs):
self._steps = 0 self._steps = 0
# Small Context -> Easier. Todo: Should we do different versions? # Small Context -> Easier. Todo: Should we do different versions?
# self.xml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets", "beerpong_wo_cup.xml") # self.xml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets", "beerpong_wo_cup.xml")
@ -65,7 +66,13 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
self.ball_in_cup = False self.ball_in_cup = False
self.dist_ground_cup = -1 # distance floor to cup if first floor contact self.dist_ground_cup = -1 # distance floor to cup if first floor contact
MujocoEnv.__init__(self, model_path=self.xml_path, frame_skip=1, mujoco_bindings="mujoco") MujocoEnv.__init__(
self,
self.xml_path,
frame_skip=1,
observation_space=self.observation_space,
**kwargs
)
utils.EzPickle.__init__(self) utils.EzPickle.__init__(self)
@property @property
@ -76,7 +83,8 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
def start_vel(self): def start_vel(self):
return self._start_vel return self._start_vel
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
-> Tuple[ObsType, Dict[str, Any]]:
self.dists = [] self.dists = []
self.dists_final = [] self.dists_final = []
self.action_costs = [] self.action_costs = []
@ -86,7 +94,7 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
self.ball_cup_contact = False self.ball_cup_contact = False
self.ball_in_cup = False self.ball_in_cup = False
self.dist_ground_cup = -1 # distance floor to cup if first floor contact self.dist_ground_cup = -1 # distance floor to cup if first floor contact
return super().reset() return super().reset(seed=seed, options=options)
def reset_model(self): def reset_model(self):
init_pos_all = self.init_qpos.copy() init_pos_all = self.init_qpos.copy()
@ -128,11 +136,11 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
if not crash: if not crash:
reward, reward_infos = self._get_reward(applied_action) reward, reward_infos = self._get_reward(applied_action)
is_collided = reward_infos['is_collided'] # TODO: Remove if self collision does not make a difference is_collided = reward_infos['is_collided'] # TODO: Remove if self collision does not make a difference
done = is_collided terminated = is_collided
self._steps += 1 self._steps += 1
else: else:
reward = -30 reward = -30
done = True terminated = True
reward_infos = {"success": False, "ball_pos": np.zeros(3), "ball_vel": np.zeros(3), "is_collided": False} reward_infos = {"success": False, "ball_pos": np.zeros(3), "ball_vel": np.zeros(3), "is_collided": False}
infos = dict( infos = dict(
@ -142,7 +150,10 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
q_vel=self.data.qvel[0:7].ravel().copy(), sim_crash=crash, q_vel=self.data.qvel[0:7].ravel().copy(), sim_crash=crash,
) )
infos.update(reward_infos) infos.update(reward_infos)
return ob, reward, done, infos
truncated = False
return ob, reward, terminated, truncated, infos
def _get_obs(self): def _get_obs(self):
theta = self.data.qpos.flat[:7].copy() theta = self.data.qpos.flat[:7].copy()
@ -258,9 +269,9 @@ class BeerPongEnvStepBasedEpisodicReward(BeerPongEnv):
return super(BeerPongEnvStepBasedEpisodicReward, self).step(a) return super(BeerPongEnvStepBasedEpisodicReward, self).step(a)
else: else:
reward = 0 reward = 0
done = True terminated, truncated = True, False
while self._steps < MAX_EPISODE_STEPS_BEERPONG: while self._steps < MAX_EPISODE_STEPS_BEERPONG:
obs, sub_reward, done, infos = super(BeerPongEnvStepBasedEpisodicReward, self).step( obs, sub_reward, terminated, truncated, infos = super(BeerPongEnvStepBasedEpisodicReward, self).step(
np.zeros(a.shape)) np.zeros(a.shape))
reward += sub_reward reward += sub_reward
return obs, reward, done, infos return obs, reward, terminated, truncated, infos

View File

@ -2,8 +2,8 @@ import os
import mujoco_py.builder import mujoco_py.builder
import numpy as np import numpy as np
from gym import utils from gymnasium import utils
from gym.envs.mujoco import MujocoEnv from gymnasium.envs.mujoco import MujocoEnv
from fancy_gym.envs.mujoco.beerpong.deprecated.beerpong_reward_staged import BeerPongReward from fancy_gym.envs.mujoco.beerpong.deprecated.beerpong_reward_staged import BeerPongReward
@ -90,11 +90,11 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
if not crash: if not crash:
reward, reward_infos = self.reward_function.compute_reward(self, applied_action) reward, reward_infos = self.reward_function.compute_reward(self, applied_action)
is_collided = reward_infos['is_collided'] is_collided = reward_infos['is_collided']
done = is_collided or self._steps == self.ep_length - 1 terminated = is_collided or self._steps == self.ep_length - 1
self._steps += 1 self._steps += 1
else: else:
reward = -30 reward = -30
done = True terminated = True
reward_infos = {"success": False, "ball_pos": np.zeros(3), "ball_vel": np.zeros(3), "is_collided": False} reward_infos = {"success": False, "ball_pos": np.zeros(3), "ball_vel": np.zeros(3), "is_collided": False}
infos = dict( infos = dict(
@ -104,7 +104,7 @@ class BeerPongEnv(MujocoEnv, utils.EzPickle):
q_vel=self.sim.data.qvel[0:7].ravel().copy(), sim_crash=crash, q_vel=self.sim.data.qvel[0:7].ravel().copy(), sim_crash=crash,
) )
infos.update(reward_infos) infos.update(reward_infos)
return ob, reward, done, infos return ob, reward, terminated, infos
def _get_obs(self): def _get_obs(self):
theta = self.sim.data.qpos.flat[:7] theta = self.sim.data.qpos.flat[:7]
@ -143,16 +143,16 @@ class BeerPongEnvStepBasedEpisodicReward(BeerPongEnv):
return super(BeerPongEnvStepBasedEpisodicReward, self).step(a) return super(BeerPongEnvStepBasedEpisodicReward, self).step(a)
else: else:
reward = 0 reward = 0
done = False terminated, truncated = False, False
while not done: while not (terminated or truncated):
sub_ob, sub_reward, done, sub_infos = super(BeerPongEnvStepBasedEpisodicReward, self).step( sub_ob, sub_reward, terminated, truncated, sub_infos = super(BeerPongEnvStepBasedEpisodicReward,
np.zeros(a.shape)) self).step(np.zeros(a.shape))
reward += sub_reward reward += sub_reward
infos = sub_infos infos = sub_infos
ob = sub_ob ob = sub_ob
ob[-1] = self.release_step + 1 # Since we simulate until the end of the episode, PPO does not see the ob[-1] = self.release_step + 1 # Since we simulate until the end of the episode, PPO does not see the
# internal steps and thus, the observation also needs to be set correctly # internal steps and thus, the observation also needs to be set correctly
return ob, reward, done, infos return ob, reward, terminated, truncated, infos
# class BeerBongEnvStepBased(BeerBongEnv): # class BeerBongEnvStepBased(BeerBongEnv):
@ -186,27 +186,3 @@ class BeerPongEnvStepBasedEpisodicReward(BeerPongEnv):
# ob[-1] = self.release_step + 1 # Since we simulate until the end of the episode, PPO does not see the # ob[-1] = self.release_step + 1 # Since we simulate until the end of the episode, PPO does not see the
# # internal steps and thus, the observation also needs to be set correctly # # internal steps and thus, the observation also needs to be set correctly
# return ob, reward, done, infos # return ob, reward, done, infos
if __name__ == "__main__":
env = BeerPongEnv(frame_skip=2)
env.seed(0)
# env = BeerBongEnvStepBased(frame_skip=2)
# env = BeerBongEnvStepBasedEpisodicReward(frame_skip=2)
# env = BeerBongEnvFixedReleaseStep(frame_skip=2)
import time
env.reset()
env.render("human")
for i in range(600):
# ac = 10 * env.action_space.sample()
ac = 0.05 * np.ones(7)
obs, rew, d, info = env.step(ac)
env.render("human")
if d:
print('reward:', rew)
print('RESETTING')
env.reset()
time.sleep(1)
env.close()

View File

@ -1,9 +1,9 @@
import os import os
from typing import Tuple, Union, Optional from typing import Tuple, Union, Optional, Any, Dict
import numpy as np import numpy as np
from gym.core import ObsType from gymnasium.core import ObsType
from gym.envs.mujoco.half_cheetah_v4 import HalfCheetahEnv from gymnasium.envs.mujoco.half_cheetah_v4 import HalfCheetahEnv
MAX_EPISODE_STEPS_HALFCHEETAHJUMP = 100 MAX_EPISODE_STEPS_HALFCHEETAHJUMP = 100
@ -44,7 +44,8 @@ class HalfCheetahJumpEnv(HalfCheetahEnv):
## Didnt use fell_over, because base env also has no done condition - Paul and Marc ## Didnt use fell_over, because base env also has no done condition - Paul and Marc
# fell_over = abs(self.sim.data.qpos[2]) > 2.5 # how to figure out if the cheetah fell over? -> 2.5 oke? # fell_over = abs(self.sim.data.qpos[2]) > 2.5 # how to figure out if the cheetah fell over? -> 2.5 oke?
# TODO: Should a fall over be checked here? # TODO: Should a fall over be checked here?
done = False terminated = False
truncated = False
ctrl_cost = self.control_cost(action) ctrl_cost = self.control_cost(action)
costs = ctrl_cost costs = ctrl_cost
@ -63,17 +64,17 @@ class HalfCheetahJumpEnv(HalfCheetahEnv):
'max_height': self.max_height 'max_height': self.max_height
} }
return observation, reward, done, info return observation, reward, terminated, truncated, info
def _get_obs(self): def _get_obs(self):
return np.append(super()._get_obs(), self.goal) return np.append(super()._get_obs(), self.goal)
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
options: Optional[dict] = None, ) -> Union[ObsType, Tuple[ObsType, dict]]: -> Tuple[ObsType, Dict[str, Any]]:
self.max_height = 0 self.max_height = 0
self.current_step = 0 self.current_step = 0
self.goal = self.np_random.uniform(1.1, 1.6, 1) # 1.1 1.6 self.goal = self.np_random.uniform(1.1, 1.6, 1) # 1.1 1.6
return super().reset() return super().reset(seed=seed, options=options)
# overwrite reset_model to make it deterministic # overwrite reset_model to make it deterministic
def reset_model(self): def reset_model(self):

View File

@ -1,7 +1,7 @@
import os import os
import numpy as np import numpy as np
from gym.envs.mujoco.hopper_v4 import HopperEnv from gymnasium.envs.mujoco.hopper_v4 import HopperEnv
MAX_EPISODE_STEPS_HOPPERJUMP = 250 MAX_EPISODE_STEPS_HOPPERJUMP = 250
@ -88,7 +88,8 @@ class HopperJumpEnv(HopperEnv):
ctrl_cost = self.control_cost(action) ctrl_cost = self.control_cost(action)
costs = ctrl_cost costs = ctrl_cost
done = False terminated = False
truncated = False
goal_dist = np.linalg.norm(site_pos_after - self.goal) goal_dist = np.linalg.norm(site_pos_after - self.goal)
if self.contact_dist is None and self.contact_with_floor: if self.contact_dist is None and self.contact_with_floor:
@ -115,7 +116,7 @@ class HopperJumpEnv(HopperEnv):
healthy=self.is_healthy, healthy=self.is_healthy,
contact_dist=self.contact_dist or 0 contact_dist=self.contact_dist or 0
) )
return observation, reward, done, info return observation, reward, terminated, truncated, info
def _get_obs(self): def _get_obs(self):
# goal_dist = self.data.get_site_xpos('foot_site') - self.goal # goal_dist = self.data.get_site_xpos('foot_site') - self.goal

View File

@ -1,7 +1,9 @@
import os import os
from typing import Optional, Dict, Any, Tuple
import numpy as np import numpy as np
from gym.envs.mujoco.hopper_v4 import HopperEnv from gymnasium.core import ObsType
from gymnasium.envs.mujoco.hopper_v4 import HopperEnv
MAX_EPISODE_STEPS_HOPPERJUMPONBOX = 250 MAX_EPISODE_STEPS_HOPPERJUMPONBOX = 250
@ -74,10 +76,10 @@ class HopperJumpOnBoxEnv(HopperEnv):
costs = ctrl_cost costs = ctrl_cost
done = fell_over or self.hopper_on_box terminated = fell_over or self.hopper_on_box
if self.current_step >= self.max_episode_steps or done: if self.current_step >= self.max_episode_steps or terminated:
done = False done = False # TODO why are we doing this???
max_height = self.max_height.copy() max_height = self.max_height.copy()
min_distance = self.min_distance.copy() min_distance = self.min_distance.copy()
@ -122,12 +124,13 @@ class HopperJumpOnBoxEnv(HopperEnv):
'goal': self.box_x, 'goal': self.box_x,
} }
return observation, reward, done, info return observation, reward, terminated, info
def _get_obs(self): def _get_obs(self):
return np.append(super()._get_obs(), self.box_x) return np.append(super()._get_obs(), self.box_x)
def reset(self): def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
-> Tuple[ObsType, Dict[str, Any]]:
self.max_height = 0 self.max_height = 0
self.min_distance = 5000 self.min_distance = 5000
@ -136,7 +139,7 @@ class HopperJumpOnBoxEnv(HopperEnv):
if self.context: if self.context:
self.box_x = self.np_random.uniform(1, 3, 1) self.box_x = self.np_random.uniform(1, 3, 1)
self.model.body("box").pos = [self.box_x[0], 0, 0] self.model.body("box").pos = [self.box_x[0], 0, 0]
return super().reset() return super().reset(seed=seed, options=options)
# overwrite reset_model to make it deterministic # overwrite reset_model to make it deterministic
def reset_model(self): def reset_model(self):
@ -151,20 +154,5 @@ class HopperJumpOnBoxEnv(HopperEnv):
observation = self._get_obs() observation = self._get_obs()
return observation return observation
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
env = HopperJumpOnBoxEnv()
obs = env.reset()
for i in range(2000):
# objective.load_result("/tmp/cma")
# test with random actions
ac = env.action_space.sample()
obs, rew, d, info = env.step(ac)
if i % 10 == 0:
env.render(mode=render_mode)
if d:
print('After ', i, ' steps, done: ', d)
env.reset()
env.close()

View File

@ -1,8 +1,9 @@
import os import os
from typing import Optional from typing import Optional, Any, Dict, Tuple
import numpy as np import numpy as np
from gym.envs.mujoco.hopper_v4 import HopperEnv from gymnasium.core import ObsType
from gymnasium.envs.mujoco.hopper_v4 import HopperEnv
MAX_EPISODE_STEPS_HOPPERTHROW = 250 MAX_EPISODE_STEPS_HOPPERTHROW = 250
@ -56,14 +57,14 @@ class HopperThrowEnv(HopperEnv):
# done = self.done TODO We should use this, not sure why there is no other termination; ball_landed should be enough, because we only look at the throw itself? - Paul and Marc # done = self.done TODO We should use this, not sure why there is no other termination; ball_landed should be enough, because we only look at the throw itself? - Paul and Marc
ball_landed = bool(self.get_body_com("ball")[2] <= 0.05) ball_landed = bool(self.get_body_com("ball")[2] <= 0.05)
done = ball_landed terminated = ball_landed
ctrl_cost = self.control_cost(action) ctrl_cost = self.control_cost(action)
costs = ctrl_cost costs = ctrl_cost
rewards = 0 rewards = 0
if self.current_step >= self.max_episode_steps or done: if self.current_step >= self.max_episode_steps or terminated:
distance_reward = -np.linalg.norm(ball_pos_after - self.goal) if self.context else \ distance_reward = -np.linalg.norm(ball_pos_after - self.goal) if self.context else \
self._forward_reward_weight * ball_pos_after self._forward_reward_weight * ball_pos_after
healthy_reward = 0 if self.context else self.healthy_reward * self.current_step healthy_reward = 0 if self.context else self.healthy_reward * self.current_step
@ -78,16 +79,18 @@ class HopperThrowEnv(HopperEnv):
'_steps': self.current_step, '_steps': self.current_step,
'goal': self.goal, 'goal': self.goal,
} }
truncated = False
return observation, reward, done, info return observation, reward, terminated, truncated, info
def _get_obs(self): def _get_obs(self):
return np.append(super()._get_obs(), self.goal) return np.append(super()._get_obs(), self.goal)
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
-> Tuple[ObsType, Dict[str, Any]]:
self.current_step = 0 self.current_step = 0
self.goal = self.goal = self.np_random.uniform(2.0, 6.0, 1) # 0.5 8.0 self.goal = self.goal = self.np_random.uniform(2.0, 6.0, 1) # 0.5 8.0
return super().reset() return super().reset(seed=seed, options=options)
# overwrite reset_model to make it deterministic # overwrite reset_model to make it deterministic
def reset_model(self): def reset_model(self):
@ -103,20 +106,3 @@ class HopperThrowEnv(HopperEnv):
return observation return observation
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
env = HopperThrowEnv()
obs = env.reset()
for i in range(2000):
# objective.load_result("/tmp/cma")
# test with random actions
ac = env.action_space.sample()
obs, rew, d, info = env.step(ac)
if i % 10 == 0:
env.render(mode=render_mode)
if d:
print('After ', i, ' steps, done: ', d)
env.reset()
env.close()

View File

@ -1,8 +1,9 @@
import os import os
from typing import Optional from typing import Optional, Any, Dict, Tuple
import numpy as np import numpy as np
from gym.envs.mujoco.hopper_v4 import HopperEnv from gymnasium.envs.mujoco.hopper_v4 import HopperEnv
from gymnasium.core import ObsType
MAX_EPISODE_STEPS_HOPPERTHROWINBASKET = 250 MAX_EPISODE_STEPS_HOPPERTHROWINBASKET = 250
@ -72,7 +73,7 @@ class HopperThrowInBasketEnv(HopperEnv):
self.ball_in_basket = True self.ball_in_basket = True
ball_landed = self.get_body_com("ball")[2] <= 0.05 ball_landed = self.get_body_com("ball")[2] <= 0.05
done = bool(ball_landed or is_in_basket) terminated = bool(ball_landed or is_in_basket)
rewards = 0 rewards = 0
@ -80,7 +81,7 @@ class HopperThrowInBasketEnv(HopperEnv):
costs = ctrl_cost costs = ctrl_cost
if self.current_step >= self.max_episode_steps or done: if self.current_step >= self.max_episode_steps or terminated:
if is_in_basket: if is_in_basket:
if not self.context: if not self.context:
@ -101,13 +102,16 @@ class HopperThrowInBasketEnv(HopperEnv):
info = { info = {
'ball_pos': ball_pos[0], 'ball_pos': ball_pos[0],
} }
truncated = False
return observation, reward, done, info return observation, reward, terminated, truncated, info
def _get_obs(self): def _get_obs(self):
return np.append(super()._get_obs(), self.basket_x) return np.append(super()._get_obs(), self.basket_x)
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
-> Tuple[ObsType, Dict[str, Any]]:
if self.max_episode_steps == 10: if self.max_episode_steps == 10:
# We have to initialize this here, because the spec is only added after creating the env. # We have to initialize this here, because the spec is only added after creating the env.
self.max_episode_steps = self.spec.max_episode_steps self.max_episode_steps = self.spec.max_episode_steps
@ -117,7 +121,7 @@ class HopperThrowInBasketEnv(HopperEnv):
if self.context: if self.context:
self.basket_x = self.np_random.uniform(low=3, high=7, size=1) self.basket_x = self.np_random.uniform(low=3, high=7, size=1)
self.model.body("basket_ground").pos[:] = [self.basket_x[0], 0, 0] self.model.body("basket_ground").pos[:] = [self.basket_x[0], 0, 0]
return super().reset() return super().reset(seed=seed, options=options)
# overwrite reset_model to make it deterministic # overwrite reset_model to make it deterministic
def reset_model(self): def reset_model(self):
@ -134,20 +138,4 @@ class HopperThrowInBasketEnv(HopperEnv):
return observation return observation
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
env = HopperThrowInBasketEnv()
obs = env.reset()
for i in range(2000):
# objective.load_result("/tmp/cma")
# test with random actions
ac = env.action_space.sample()
obs, rew, d, info = env.step(ac)
if i % 10 == 0:
env.render(mode=render_mode)
if d:
print('After ', i, ' steps, done: ', d)
env.reset()
env.close()

View File

@ -1,9 +1,9 @@
import os import os
import numpy as np import numpy as np
from gym import utils from gymnasium import utils
from gym.envs.mujoco import MujocoEnv from gymnasium.envs.mujoco import MujocoEnv
from gym.spaces import Box from gymnasium.spaces import Box
MAX_EPISODE_STEPS_REACHER = 200 MAX_EPISODE_STEPS_REACHER = 200

View File

@ -1,8 +1,9 @@
import os import os
from typing import Optional from typing import Optional, Any, Dict, Tuple
import numpy as np import numpy as np
from gym.envs.mujoco.walker2d_v4 import Walker2dEnv from gymnasium.envs.mujoco.walker2d_v4 import Walker2dEnv
from gymnasium.core import ObsType
MAX_EPISODE_STEPS_WALKERJUMP = 300 MAX_EPISODE_STEPS_WALKERJUMP = 300
@ -54,13 +55,13 @@ class Walker2dJumpEnv(Walker2dEnv):
self.max_height = max(height, self.max_height) self.max_height = max(height, self.max_height)
done = bool(height < 0.2) terminated = bool(height < 0.2)
ctrl_cost = self.control_cost(action) ctrl_cost = self.control_cost(action)
costs = ctrl_cost costs = ctrl_cost
rewards = 0 rewards = 0
if self.current_step >= self.max_episode_steps or done: if self.current_step >= self.max_episode_steps or terminated:
done = True terminated = True
height_goal_distance = -10 * (np.linalg.norm(self.max_height - self.goal)) height_goal_distance = -10 * (np.linalg.norm(self.max_height - self.goal))
healthy_reward = self.healthy_reward * self.current_step healthy_reward = self.healthy_reward * self.current_step
@ -73,17 +74,19 @@ class Walker2dJumpEnv(Walker2dEnv):
'max_height': self.max_height, 'max_height': self.max_height,
'goal': self.goal, 'goal': self.goal,
} }
truncated = False
return observation, reward, done, info return observation, reward, terminated, truncated, info
def _get_obs(self): def _get_obs(self):
return np.append(super()._get_obs(), self.goal) return np.append(super()._get_obs(), self.goal)
def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None): def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) \
-> Tuple[ObsType, Dict[str, Any]]:
self.current_step = 0 self.current_step = 0
self.max_height = 0 self.max_height = 0
self.goal = self.np_random.uniform(1.5, 2.5, 1) # 1.5 3.0 self.goal = self.np_random.uniform(1.5, 2.5, 1) # 1.5 3.0
return super().reset() return super().reset(seed=seed, options=options)
# overwrite reset_model to make it deterministic # overwrite reset_model to make it deterministic
def reset_model(self): def reset_model(self):
@ -98,20 +101,3 @@ class Walker2dJumpEnv(Walker2dEnv):
observation = self._get_obs() observation = self._get_obs()
return observation return observation
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
env = Walker2dJumpEnv()
obs = env.reset()
for i in range(6000):
# test with random actions
ac = env.action_space.sample()
obs, rew, d, info = env.step(ac)
if i % 10 == 0:
env.render(mode=render_mode)
if d:
print('After ', i, ' steps, done: ', d)
env.reset()
env.close()

View File

@ -1,6 +1,6 @@
from copy import deepcopy from copy import deepcopy
from gym import register from gymnasium import register
from . import goal_object_change_mp_wrapper, goal_change_mp_wrapper, goal_endeffector_change_mp_wrapper, \ from . import goal_object_change_mp_wrapper, goal_change_mp_wrapper, goal_endeffector_change_mp_wrapper, \
object_change_mp_wrapper object_change_mp_wrapper

View File

@ -1,6 +1,6 @@
from copy import deepcopy from copy import deepcopy
from gym import register from gymnasium import register
from . import mujoco from . import mujoco
from .deprecated_needs_gym_robotics import robotics from .deprecated_needs_gym_robotics import robotics

View File

@ -1,18 +1,17 @@
import logging import logging
import re
import uuid import uuid
from collections.abc import MutableMapping from collections.abc import MutableMapping
from copy import deepcopy from copy import deepcopy
from math import ceil from math import ceil
from typing import Iterable, Type, Union from typing import Iterable, Type, Union
import gym import gymnasium as gym
import numpy as np import numpy as np
from gym.envs.registration import register, registry from gymnasium.envs.registration import register, registry
from gym.utils import seeding
try: try:
from dm_control import suite, manipulation from dm_control import suite, manipulation
from shimmy.dm_control_compatibility import EnvType
except ImportError: except ImportError:
pass pass
@ -83,15 +82,20 @@ def make(env_id: str, seed: int, **kwargs):
if framework == 'metaworld': if framework == 'metaworld':
# MetaWorld environment # MetaWorld environment
env = make_metaworld(env_id, seed, **kwargs) env = make_metaworld(env_id, seed, **kwargs)
elif framework == 'dmc': # elif framework == 'dmc':
# DeepMind Control environment # Deprecated: With shimmy gym now has native support for deepmind envs
env = make_dmc(env_id, seed, **kwargs) # # DeepMind Control environment
# env = make_dmc(env_id, seed, **kwargs)
else: else:
env = make_gym(env_id, seed, **kwargs) env = make_gym(env_id, seed, **kwargs)
np_random, _ = seeding.np_random(seed) # try:
env.np_random = np_random env.reset(seed=seed)
# env.seed(seed) # except TypeError:
# # Support for older gym envs that do not have seeding
# # env.seed(seed)
# np_random, _ = seeding.np_random(seed)
# env.np_random = np_random
env.action_space.seed(seed) env.action_space.seed(seed)
env.observation_space.seed(seed) env.observation_space.seed(seed)
@ -161,7 +165,7 @@ def make_bb(
traj_gen_kwargs['action_dim'] = traj_gen_kwargs.get('action_dim', np.prod(env.action_space.shape).item()) traj_gen_kwargs['action_dim'] = traj_gen_kwargs.get('action_dim', np.prod(env.action_space.shape).item())
if black_box_kwargs.get('duration') is None: if black_box_kwargs.get('duration') is None:
black_box_kwargs['duration'] = env.spec.max_episode_steps * env.dt black_box_kwargs['duration'] = get_env_duration(env)
if phase_kwargs.get('tau') is None: if phase_kwargs.get('tau') is None:
phase_kwargs['tau'] = black_box_kwargs['duration'] phase_kwargs['tau'] = black_box_kwargs['duration']
@ -180,6 +184,24 @@ def make_bb(
return bb_env return bb_env
def get_env_duration(env: gym.Env):
try:
# TODO Remove if this is in the compatibility class
duration = env.spec.max_episode_steps * env.dt
except (AttributeError, TypeError) as e:
logging.error(f'Attributes env.spec.max_episode_steps and env.dt are not available. '
f'Assuming you are using dm_control. Please make sure you have ran '
f'"pip install shimmy[dm_control]" for that.')
if env.env_type is EnvType.COMPOSER:
max_episode_steps = ceil(env.unwrapped._time_limit / env.dt)
elif env.env_type is EnvType.RL_CONTROL:
max_episode_steps = int(env.unwrapped._step_limit)
else:
raise e
duration = max_episode_steps * env.control_timestep()
return duration
def make_bb_env_helper(**kwargs): def make_bb_env_helper(**kwargs):
""" """
Helper function for registering a black box gym environment. Helper function for registering a black box gym environment.
@ -229,52 +251,53 @@ def make_bb_env_helper(**kwargs):
basis_kwargs=basis_kwargs, **kwargs, seed=seed) basis_kwargs=basis_kwargs, **kwargs, seed=seed)
def make_dmc( # Deprecated: With shimmy gym now has native support for deepmind envs
env_id: str, # def make_dmc(
seed: int = None, # env_id: str,
visualize_reward: bool = True, # seed: int = None,
time_limit: Union[None, float] = None, # visualize_reward: bool = True,
**kwargs # time_limit: Union[None, float] = None,
): # **kwargs
if not re.match(r"\w+-\w+", env_id): # ):
raise ValueError("env_id does not have the following structure: 'domain_name-task_name'") # if not re.match(r"\w+-\w+", env_id):
domain_name, task_name = env_id.split("-") # raise ValueError("env_id does not have the following structure: 'domain_name-task_name'")
# domain_name, task_name = env_id.split("-")
if task_name.endswith("_vision"): #
# TODO # if task_name.endswith("_vision"):
raise ValueError("The vision interface for manipulation tasks is currently not supported.") # # TODO
# raise ValueError("The vision interface for manipulation tasks is currently not supported.")
if (domain_name, task_name) not in suite.ALL_TASKS and task_name not in manipulation.ALL: #
raise ValueError(f'Specified domain "{domain_name}" and task "{task_name}" combination does not exist.') # if (domain_name, task_name) not in suite.ALL_TASKS and task_name not in manipulation.ALL:
# raise ValueError(f'Specified domain "{domain_name}" and task "{task_name}" combination does not exist.')
# env_id = f'dmc_{domain_name}_{task_name}_{seed}-v1' #
gym_id = uuid.uuid4().hex + '-v1' # # env_id = f'dmc_{domain_name}_{task_name}_{seed}-v1'
# gym_id = uuid.uuid4().hex + '-v1'
task_kwargs = {'random': seed} #
if time_limit is not None: # task_kwargs = {'random': seed}
task_kwargs['time_limit'] = time_limit # if time_limit is not None:
# task_kwargs['time_limit'] = time_limit
# create task #
# Accessing private attribute because DMC does not expose time_limit or step_limit. # # create task
# Only the current time_step/time as well as the control_timestep can be accessed. # # Accessing private attribute because DMC does not expose time_limit or step_limit.
if domain_name == "manipulation": # # Only the current time_step/time as well as the control_timestep can be accessed.
env = manipulation.load(environment_name=task_name, seed=seed) # if domain_name == "manipulation":
max_episode_steps = ceil(env._time_limit / env.control_timestep()) # env = manipulation.load(environment_name=task_name, seed=seed)
else: # max_episode_steps = ceil(env._time_limit / env.control_timestep())
env = suite.load(domain_name=domain_name, task_name=task_name, task_kwargs=task_kwargs, # else:
visualize_reward=visualize_reward, environment_kwargs=kwargs) # env = suite.load(domain_name=domain_name, task_name=task_name, task_kwargs=task_kwargs,
max_episode_steps = int(env._step_limit) # visualize_reward=visualize_reward, environment_kwargs=kwargs)
# max_episode_steps = int(env._step_limit)
register( #
id=gym_id, # register(
entry_point='fancy_gym.dmc.dmc_wrapper:DMCWrapper', # id=gym_id,
kwargs={'env': lambda: env}, # entry_point='fancy_gym.dmc.dmc_wrapper:DMCWrapper',
max_episode_steps=max_episode_steps, # kwargs={'env': lambda: env},
) # max_episode_steps=max_episode_steps,
# )
env = gym.make(gym_id) #
env.seed(seed) # env = gym.make(gym_id)
return env # env.seed(seed)
# return env
def make_metaworld(env_id: str, seed: int, **kwargs): def make_metaworld(env_id: str, seed: int, **kwargs):
@ -288,12 +311,17 @@ def make_metaworld(env_id: str, seed: int, **kwargs):
# New argument to use global seeding # New argument to use global seeding
_env.seeded_rand_vec = True _env.seeded_rand_vec = True
max_episode_steps = _env.max_path_length
# TODO remove this as soon as there is support for the new API
_env = gym.wrappers.EnvCompatibility(_env)
gym_id = uuid.uuid4().hex + '-v1' gym_id = uuid.uuid4().hex + '-v1'
register( register(
id=gym_id, id=gym_id,
entry_point=lambda: _env, entry_point=lambda: _env,
max_episode_steps=_env.max_path_length, max_episode_steps=max_episode_steps,
) )
# TODO enable checker when the incorrect dtype of obs and observation space are fixed by metaworld # TODO enable checker when the incorrect dtype of obs and observation space are fixed by metaworld

View File

@ -1,45 +1,11 @@
""" import gymnasium as gym
Adapted from: https://github.com/openai/gym/blob/907b1b20dd9ac0cba5803225059b9c6673702467/gym/wrappers/time_aware_observation.py
License: MIT
Copyright (c) 2016 OpenAI (https://openai.com)
Wrapper for adding time aware observations to environment observation.
"""
import gym
import numpy as np import numpy as np
from gym.spaces import Box
class TimeAwareObservation(gym.ObservationWrapper): class TimeAwareObservation(gym.wrappers.TimeAwareObservation):
"""Augment the observation with the current time step in the episode.
The observation space of the wrapped environment is assumed to be a flat :class:`Box`.
In particular, pixel observations are not supported. This wrapper will append the current timestep
within the current episode to the observation.
Example:
>>> import gym
>>> env = gym.make('CartPole-v1')
>>> env = TimeAwareObservation(env)
>>> env.reset()
array([ 0.03810719, 0.03522411, 0.02231044, -0.01088205, 0. ])
>>> env.step(env.action_space.sample())[0]
array([ 0.03881167, -0.16021058, 0.0220928 , 0.28875574, 1. ])
"""
def __init__(self, env: gym.Env): def __init__(self, env: gym.Env):
"""Initialize :class:`TimeAwareObservation` that requires an environment with a flat :class:`Box`
observation space.
Args:
env: The environment to apply the wrapper
"""
super().__init__(env) super().__init__(env)
assert isinstance(env.observation_space, Box)
low = np.append(self.observation_space.low, 0.0)
high = np.append(self.observation_space.high, 1.0)
self.observation_space = Box(low, high, dtype=self.observation_space.dtype)
self.t = 0
self._max_episode_steps = env.spec.max_episode_steps self._max_episode_steps = env.spec.max_episode_steps
def observation(self, observation): def observation(self, observation):
@ -52,27 +18,3 @@ class TimeAwareObservation(gym.ObservationWrapper):
The observation with the time step appended to The observation with the time step appended to
""" """
return np.append(observation, self.t / self._max_episode_steps) return np.append(observation, self.t / self._max_episode_steps)
def step(self, action):
"""Steps through the environment, incrementing the time step.
Args:
action: The action to take
Returns:
The environment's step using the action.
"""
self.t += 1
return super().step(action)
def reset(self, **kwargs):
"""Reset the environment setting the time to zero.
Args:
**kwargs: Kwargs to apply to env.reset()
Returns:
The reset environment
"""
self.t = 0
return super().reset(**kwargs)