Merge pull request #41 from ALRhub/26-sequencingreplanning-feature-for-episode-based-environments
Added sequencing and replanning feature for episode-based environments
This commit is contained in:
commit
eaedd58e73
@ -67,23 +67,29 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
||||
|
||||
def observation(self, observation):
|
||||
# return context space if we are
|
||||
obs = observation[self.env.context_mask] if self.return_context_observation else observation
|
||||
if self.return_context_observation:
|
||||
observation = observation[self.env.context_mask]
|
||||
# cast dtype because metaworld returns incorrect that throws gym error
|
||||
return obs.astype(self.observation_space.dtype)
|
||||
return observation.astype(self.observation_space.dtype)
|
||||
|
||||
def get_trajectory(self, action: np.ndarray) -> Tuple:
|
||||
clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
|
||||
self.traj_gen.set_params(clipped_params)
|
||||
# TODO: is this correct for replanning? Do we need to adjust anything here?
|
||||
self.traj_gen.set_boundary_conditions(
|
||||
bc_time=np.array(0) if not self.do_replanning else np.array([self.current_traj_steps * self.dt]),
|
||||
bc_pos=self.current_pos, bc_vel=self.current_vel)
|
||||
# TODO remove the - self.dt after Bruces fix.
|
||||
self.traj_gen.set_duration(None if self.learn_sub_trajectories else self.duration - self.dt, self.dt)
|
||||
traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
|
||||
trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel']
|
||||
bc_time = np.array(0 if not self.do_replanning else self.current_traj_steps * self.dt)
|
||||
# TODO we could think about initializing with the previous desired value in order to have a smooth transition
|
||||
# at least from the planning point of view.
|
||||
self.traj_gen.set_boundary_conditions(bc_time, self.current_pos, self.current_vel)
|
||||
duration = None if self.learn_sub_trajectories else self.duration
|
||||
self.traj_gen.set_duration(duration, self.dt)
|
||||
# traj_dict = self.traj_gen.get_trajs(get_pos=True, get_vel=True)
|
||||
trajectory = get_numpy(self.traj_gen.get_traj_pos())
|
||||
velocity = get_numpy(self.traj_gen.get_traj_vel())
|
||||
|
||||
return get_numpy(trajectory_tensor), get_numpy(velocity_tensor)
|
||||
# Remove first element of trajectory as this is the current position and velocity
|
||||
# trajectory = trajectory[1:]
|
||||
# velocity = velocity[1:]
|
||||
|
||||
return trajectory, velocity
|
||||
|
||||
def _get_traj_gen_action_space(self):
|
||||
"""This function can be used to set up an individual space for the parameters of the traj_gen."""
|
||||
@ -105,13 +111,13 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
||||
return self._get_traj_gen_action_space()
|
||||
|
||||
def _get_observation_space(self):
|
||||
if self.return_context_observation:
|
||||
mask = self.env.context_mask
|
||||
if not self.return_context_observation:
|
||||
# return full observation
|
||||
mask = np.ones_like(mask, dtype=bool)
|
||||
min_obs_bound = self.env.observation_space.low[mask]
|
||||
max_obs_bound = self.env.observation_space.high[mask]
|
||||
return spaces.Box(low=min_obs_bound, high=max_obs_bound, dtype=self.env.observation_space.dtype)
|
||||
return self.env.observation_space
|
||||
|
||||
def step(self, action: np.ndarray):
|
||||
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
|
||||
@ -152,7 +158,7 @@ class BlackBoxWrapper(gym.ObservationWrapper):
|
||||
t + 1 + self.current_traj_steps):
|
||||
break
|
||||
|
||||
infos.update({k: v[:t + 1] for k, v in infos.items()})
|
||||
infos.update({k: v[:t] for k, v in infos.items()})
|
||||
self.current_traj_steps += t + 1
|
||||
|
||||
if self.verbose >= 2:
|
||||
|
@ -1,4 +1,3 @@
|
||||
from abc import abstractmethod
|
||||
from typing import Union, Tuple
|
||||
|
||||
import gym
|
||||
@ -23,7 +22,6 @@ class RawInterfaceWrapper(gym.Wrapper):
|
||||
return np.ones(self.env.observation_space.shape[0], dtype=bool)
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||
"""
|
||||
Returns the current position of the action/control dimension.
|
||||
@ -32,10 +30,9 @@ class RawInterfaceWrapper(gym.Wrapper):
|
||||
it should, however, be implemented regardless.
|
||||
E.g. The joint positions that are directly or indirectly controlled by the action.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||
"""
|
||||
Returns the current velocity of the action/control dimension.
|
||||
@ -44,7 +41,7 @@ class RawInterfaceWrapper(gym.Wrapper):
|
||||
it should, however, be implemented regardless.
|
||||
E.g. The joint velocities that are directly or indirectly controlled by the action.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def dt(self) -> float:
|
||||
|
@ -126,7 +126,7 @@ for _dims in [5, 7]:
|
||||
register(
|
||||
id=f'Reacher{_dims}dSparse-v0',
|
||||
entry_point='fancy_gym.envs.mujoco:ReacherEnv',
|
||||
max_episode_steps=200,
|
||||
max_episode_steps=MAX_EPISODE_STEPS_REACHER,
|
||||
kwargs={
|
||||
"sparse": True,
|
||||
'reward_weight': 200,
|
||||
|
@ -1,4 +1,3 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Union, Tuple, Optional
|
||||
|
||||
import gym
|
||||
@ -10,7 +9,7 @@ from gym.utils import seeding
|
||||
from fancy_gym.envs.classic_control.utils import intersect
|
||||
|
||||
|
||||
class BaseReacherEnv(gym.Env, ABC):
|
||||
class BaseReacherEnv(gym.Env):
|
||||
"""
|
||||
Base class for all reaching environments.
|
||||
"""
|
||||
@ -87,13 +86,6 @@ class BaseReacherEnv(gym.Env, ABC):
|
||||
|
||||
return self._get_obs().copy()
|
||||
|
||||
@abstractmethod
|
||||
def step(self, action: np.ndarray):
|
||||
"""
|
||||
A single step with action in angular velocity space
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _update_joints(self):
|
||||
"""
|
||||
update joints to get new end-effector position. The other links are only required for rendering.
|
||||
@ -120,27 +112,24 @@ class BaseReacherEnv(gym.Env, ABC):
|
||||
return True
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def _get_reward(self, action: np.ndarray) -> (float, dict):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def _get_obs(self) -> np.ndarray:
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def _check_collisions(self) -> bool:
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def _terminate(self, info) -> bool:
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def seed(self, seed=None):
|
||||
self.np_random, seed = seeding.np_random(seed)
|
||||
return [seed]
|
||||
|
||||
def close(self):
|
||||
super(BaseReacherEnv, self).close()
|
||||
del self.fig
|
||||
|
||||
@property
|
||||
|
@ -1,12 +1,10 @@
|
||||
from abc import ABC
|
||||
|
||||
import numpy as np
|
||||
from gym import spaces
|
||||
|
||||
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
|
||||
|
||||
|
||||
class BaseReacherDirectEnv(BaseReacherEnv, ABC):
|
||||
class BaseReacherDirectEnv(BaseReacherEnv):
|
||||
"""
|
||||
Base class for directly controlled reaching environments
|
||||
"""
|
||||
|
@ -1,12 +1,10 @@
|
||||
from abc import ABC
|
||||
|
||||
import numpy as np
|
||||
from gym import spaces
|
||||
|
||||
from fancy_gym.envs.classic_control.base_reacher.base_reacher import BaseReacherEnv
|
||||
|
||||
|
||||
class BaseReacherTorqueEnv(BaseReacherEnv, ABC):
|
||||
class BaseReacherTorqueEnv(BaseReacherEnv):
|
||||
"""
|
||||
Base class for torque controlled reaching environments
|
||||
"""
|
||||
|
@ -98,7 +98,7 @@ class HopperJumpEnv(HopperEnv):
|
||||
if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP):
|
||||
healthy_reward = self.healthy_reward
|
||||
distance_reward = -goal_dist * self._dist_weight
|
||||
height_reward = (self.max_height if self.sparse else self.get_body_com("torso")[2]) * self._height_weight
|
||||
height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
|
||||
contact_reward = -(self.contact_dist or 5) * self._contact_weight
|
||||
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
|
||||
|
||||
@ -123,7 +123,7 @@ class HopperJumpEnv(HopperEnv):
|
||||
return np.concatenate((super(HopperJumpEnv, self)._get_obs(), goal_dist.copy(), self.goal[:1]))
|
||||
|
||||
def reset_model(self):
|
||||
super(HopperJumpEnv, self).reset_model()
|
||||
# super(HopperJumpEnv, self).reset_model()
|
||||
|
||||
# self.goal = self.np_random.uniform(0.3, 1.35, 1)[0]
|
||||
self.goal = np.concatenate([self.np_random.uniform(0.3, 1.35, 1), np.zeros(2, )])
|
||||
@ -176,76 +176,76 @@ class HopperJumpEnv(HopperEnv):
|
||||
return True
|
||||
return False
|
||||
|
||||
# TODO is that needed? if so test it
|
||||
class HopperJumpStepEnv(HopperJumpEnv):
|
||||
|
||||
def __init__(self,
|
||||
xml_file='hopper_jump.xml',
|
||||
forward_reward_weight=1.0,
|
||||
ctrl_cost_weight=1e-3,
|
||||
healthy_reward=1.0,
|
||||
height_weight=3,
|
||||
dist_weight=3,
|
||||
terminate_when_unhealthy=False,
|
||||
healthy_state_range=(-100.0, 100.0),
|
||||
healthy_z_range=(0.5, float('inf')),
|
||||
healthy_angle_range=(-float('inf'), float('inf')),
|
||||
reset_noise_scale=5e-3,
|
||||
exclude_current_positions_from_observation=False
|
||||
):
|
||||
|
||||
self._height_weight = height_weight
|
||||
self._dist_weight = dist_weight
|
||||
super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
|
||||
healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
|
||||
exclude_current_positions_from_observation)
|
||||
|
||||
def step(self, action):
|
||||
self._steps += 1
|
||||
|
||||
self.do_simulation(action, self.frame_skip)
|
||||
|
||||
height_after = self.get_body_com("torso")[2]
|
||||
site_pos_after = self.data.site('foot_site').xpos.copy()
|
||||
self.max_height = max(height_after, self.max_height)
|
||||
|
||||
ctrl_cost = self.control_cost(action)
|
||||
healthy_reward = self.healthy_reward
|
||||
height_reward = self._height_weight * height_after
|
||||
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
|
||||
goal_dist_reward = -self._dist_weight * goal_dist
|
||||
dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
|
||||
|
||||
rewards = dist_reward + healthy_reward
|
||||
costs = ctrl_cost
|
||||
done = False
|
||||
|
||||
# This is only for logging the distance to goal when first having the contact
|
||||
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
|
||||
|
||||
if not self.init_floor_contact:
|
||||
self.init_floor_contact = has_floor_contact
|
||||
if self.init_floor_contact and not self.has_left_floor:
|
||||
self.has_left_floor = not has_floor_contact
|
||||
if not self.contact_with_floor and self.has_left_floor:
|
||||
self.contact_with_floor = has_floor_contact
|
||||
|
||||
if self.contact_dist is None and self.contact_with_floor:
|
||||
self.contact_dist = goal_dist
|
||||
|
||||
##############################################################
|
||||
|
||||
observation = self._get_obs()
|
||||
reward = rewards - costs
|
||||
info = {
|
||||
'height': height_after,
|
||||
'x_pos': site_pos_after,
|
||||
'max_height': copy.copy(self.max_height),
|
||||
'goal': copy.copy(self.goal),
|
||||
'goal_dist': goal_dist,
|
||||
'height_rew': height_reward,
|
||||
'healthy_reward': healthy_reward,
|
||||
'healthy': copy.copy(self.is_healthy),
|
||||
'contact_dist': copy.copy(self.contact_dist) or 0
|
||||
}
|
||||
return observation, reward, done, info
|
||||
# # TODO is that needed? if so test it
|
||||
# class HopperJumpStepEnv(HopperJumpEnv):
|
||||
#
|
||||
# def __init__(self,
|
||||
# xml_file='hopper_jump.xml',
|
||||
# forward_reward_weight=1.0,
|
||||
# ctrl_cost_weight=1e-3,
|
||||
# healthy_reward=1.0,
|
||||
# height_weight=3,
|
||||
# dist_weight=3,
|
||||
# terminate_when_unhealthy=False,
|
||||
# healthy_state_range=(-100.0, 100.0),
|
||||
# healthy_z_range=(0.5, float('inf')),
|
||||
# healthy_angle_range=(-float('inf'), float('inf')),
|
||||
# reset_noise_scale=5e-3,
|
||||
# exclude_current_positions_from_observation=False
|
||||
# ):
|
||||
#
|
||||
# self._height_weight = height_weight
|
||||
# self._dist_weight = dist_weight
|
||||
# super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
|
||||
# healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
|
||||
# exclude_current_positions_from_observation)
|
||||
#
|
||||
# def step(self, action):
|
||||
# self._steps += 1
|
||||
#
|
||||
# self.do_simulation(action, self.frame_skip)
|
||||
#
|
||||
# height_after = self.get_body_com("torso")[2]
|
||||
# site_pos_after = self.data.site('foot_site').xpos.copy()
|
||||
# self.max_height = max(height_after, self.max_height)
|
||||
#
|
||||
# ctrl_cost = self.control_cost(action)
|
||||
# healthy_reward = self.healthy_reward
|
||||
# height_reward = self._height_weight * height_after
|
||||
# goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
|
||||
# goal_dist_reward = -self._dist_weight * goal_dist
|
||||
# dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
|
||||
#
|
||||
# rewards = dist_reward + healthy_reward
|
||||
# costs = ctrl_cost
|
||||
# done = False
|
||||
#
|
||||
# # This is only for logging the distance to goal when first having the contact
|
||||
# has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
|
||||
#
|
||||
# if not self.init_floor_contact:
|
||||
# self.init_floor_contact = has_floor_contact
|
||||
# if self.init_floor_contact and not self.has_left_floor:
|
||||
# self.has_left_floor = not has_floor_contact
|
||||
# if not self.contact_with_floor and self.has_left_floor:
|
||||
# self.contact_with_floor = has_floor_contact
|
||||
#
|
||||
# if self.contact_dist is None and self.contact_with_floor:
|
||||
# self.contact_dist = goal_dist
|
||||
#
|
||||
# ##############################################################
|
||||
#
|
||||
# observation = self._get_obs()
|
||||
# reward = rewards - costs
|
||||
# info = {
|
||||
# 'height': height_after,
|
||||
# 'x_pos': site_pos_after,
|
||||
# 'max_height': copy.copy(self.max_height),
|
||||
# 'goal': copy.copy(self.goal),
|
||||
# 'goal_dist': goal_dist,
|
||||
# 'height_rew': height_reward,
|
||||
# 'healthy_reward': healthy_reward,
|
||||
# 'healthy': copy.copy(self.is_healthy),
|
||||
# 'contact_dist': copy.copy(self.contact_dist) or 0
|
||||
# }
|
||||
# return observation, reward, done, info
|
||||
|
@ -14,7 +14,7 @@ class MPWrapper(RawInterfaceWrapper):
|
||||
[False] * (2 + int(not self.exclude_current_positions_from_observation)), # position
|
||||
[True] * 3, # set to true if randomize initial pos
|
||||
[False] * 6, # velocity
|
||||
[True] * 3, # goal distance
|
||||
[False] * 3, # goal distance
|
||||
[True] # goal
|
||||
])
|
||||
|
||||
|
@ -1,4 +1,3 @@
|
||||
from abc import ABC
|
||||
from typing import Tuple, Union
|
||||
|
||||
import numpy as np
|
||||
@ -6,7 +5,7 @@ import numpy as np
|
||||
from fancy_gym.black_box.raw_interface_wrapper import RawInterfaceWrapper
|
||||
|
||||
|
||||
class BaseMetaworldMPWrapper(RawInterfaceWrapper, ABC):
|
||||
class BaseMetaworldMPWrapper(RawInterfaceWrapper):
|
||||
@property
|
||||
def current_pos(self) -> Union[float, int, np.ndarray]:
|
||||
r_close = self.env.data.get_joint_qpos("r_close")
|
||||
|
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from collections.abc import MutableMapping
|
||||
@ -148,9 +149,9 @@ def make_bb(
|
||||
raise ValueError('Cannot used sub-trajectory learning and replanning together.')
|
||||
|
||||
# add time_step observation when replanning
|
||||
if (learn_sub_trajs or do_replanning) and not any(issubclass(w, TimeAwareObservation) for w in kwargs['wrappers']):
|
||||
if (learn_sub_trajs or do_replanning) and not any(issubclass(w, TimeAwareObservation) for w in wrappers):
|
||||
# Add as first wrapper in order to alter observation
|
||||
kwargs['wrappers'].insert(0, TimeAwareObservation)
|
||||
wrappers.insert(0, TimeAwareObservation)
|
||||
|
||||
env = _make_wrapped_env(env_id=env_id, wrappers=wrappers, seed=seed, **kwargs)
|
||||
|
||||
@ -310,7 +311,11 @@ def make_gym(env_id, seed, **kwargs):
|
||||
"""
|
||||
# Getting the existing keywords to allow for nested dict updates for BB envs
|
||||
# gym only allows for non nested updates.
|
||||
try:
|
||||
all_kwargs = deepcopy(registry.get(env_id).kwargs)
|
||||
except AttributeError as e:
|
||||
logging.error(f'The gym environment with id {env_id} could not been found.')
|
||||
raise e
|
||||
nested_update(all_kwargs, kwargs)
|
||||
kwargs = all_kwargs
|
||||
|
||||
|
@ -40,9 +40,10 @@ class TimeAwareObservation(gym.ObservationWrapper):
|
||||
high = np.append(self.observation_space.high, 1.0)
|
||||
self.observation_space = Box(low, high, dtype=self.observation_space.dtype)
|
||||
self.t = 0
|
||||
self._max_episode_steps = env.spec.max_episode_steps
|
||||
|
||||
def observation(self, observation):
|
||||
"""Adds to the observation with the current time step.
|
||||
"""Adds to the observation with the current time step normalized with max steps.
|
||||
|
||||
Args:
|
||||
observation: The observation to add the time step to
|
||||
@ -50,7 +51,7 @@ class TimeAwareObservation(gym.ObservationWrapper):
|
||||
Returns:
|
||||
The observation with the time step appended to
|
||||
"""
|
||||
return np.append(observation, self.t/self.env.spec.max_episode_steps)
|
||||
return np.append(observation, self.t / self._max_episode_steps)
|
||||
|
||||
def step(self, action):
|
||||
"""Steps through the environment, incrementing the time step.
|
||||
|
Loading…
Reference in New Issue
Block a user