sequencing and replanning

This commit is contained in:
Fabian 2022-06-29 16:30:36 +02:00
parent 9b48fc9d48
commit b200cf4b69
2 changed files with 34 additions and 55 deletions

View File

@ -8,6 +8,7 @@ from mp_pytorch.mp.mp_interfaces import MPInterface
from alr_envs.mp.controllers.base_controller import BaseController from alr_envs.mp.controllers.base_controller import BaseController
from alr_envs.mp.raw_interface_wrapper import RawInterfaceWrapper from alr_envs.mp.raw_interface_wrapper import RawInterfaceWrapper
from alr_envs.utils.utils import get_numpy
class BlackBoxWrapper(gym.ObservationWrapper, ABC): class BlackBoxWrapper(gym.ObservationWrapper, ABC):
@ -15,7 +16,7 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC):
def __init__(self, def __init__(self,
env: RawInterfaceWrapper, env: RawInterfaceWrapper,
trajectory_generator: MPInterface, tracking_controller: BaseController, trajectory_generator: MPInterface, tracking_controller: BaseController,
duration: float, verbose: int = 1, sequencing=True, reward_aggregation: callable = np.sum): duration: float, verbose: int = 1, sequencing: bool = True, reward_aggregation: callable = np.sum):
""" """
gym.Wrapper for leveraging a black box approach with a trajectory generator. gym.Wrapper for leveraging a black box approach with a trajectory generator.
@ -33,67 +34,50 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC):
self.env = env self.env = env
self.duration = duration self.duration = duration
self.sequencing = sequencing self.sequencing = sequencing
# self.traj_steps = int(duration / self.dt) self.current_traj_steps = 0
# self.post_traj_steps = self.env.spec.max_episode_steps - self.traj_steps
# duration = self.env.max_episode_steps * self.dt
# trajectory generation # trajectory generation
self.trajectory_generator = trajectory_generator self.trajectory_generator = trajectory_generator
self.tracking_controller = tracking_controller self.tracking_controller = tracking_controller
# self.weight_scale = weight_scale
# self.time_steps = np.linspace(0, self.duration, self.traj_steps) # self.time_steps = np.linspace(0, self.duration, self.traj_steps)
# self.trajectory_generator.set_mp_times(self.time_steps) # self.trajectory_generator.set_mp_times(self.time_steps)
if not sequencing: self.trajectory_generator.set_duration(np.array([self.duration]), np.array([self.dt]))
self.trajectory_generator.set_mp_duration(np.array([self.duration]), np.array([self.dt]))
else:
# sequencing stuff
pass
# reward computation # reward computation
self.reward_aggregation = reward_aggregation self.reward_aggregation = reward_aggregation
# spaces # spaces
self.mp_action_space = self.get_mp_action_space() self.return_context_observation = not (self.sequencing) # TODO or we_do_replanning?)
self.traj_gen_action_space = self.get_traj_gen_action_space()
self.action_space = self.get_action_space() self.action_space = self.get_action_space()
self.observation_space = spaces.Box(low=self.env.observation_space.low[self.env.context_mask], self.observation_space = spaces.Box(low=self.env.observation_space.low[self.env.context_mask],
high=self.env.observation_space.high[self.env.context_mask], high=self.env.observation_space.high[self.env.context_mask],
dtype=self.env.observation_space.dtype) dtype=self.env.observation_space.dtype)
# rendering # rendering
self.render_mode = None
self.render_kwargs = {} self.render_kwargs = {}
self.verbose = verbose self.verbose = verbose
@property
def dt(self):
return self.env.dt
def observation(self, observation): def observation(self, observation):
return observation[self.env.context_mask] # return context space if we are
return observation[self.context_mask] if self.return_context_observation else observation
def get_trajectory(self, action: np.ndarray) -> Tuple: def get_trajectory(self, action: np.ndarray) -> Tuple:
clipped_params = np.clip(action, self.mp_action_space.low, self.mp_action_space.high) clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high)
self.trajectory_generator.set_params(clipped_params) self.trajectory_generator.set_params(clipped_params)
# if self.trajectory_generator.learn_tau: # if self.trajectory_generator.learn_tau:
# self.trajectory_generator.set_mp_duration(self.trajectory_generator.tau, np.array([self.dt])) # self.trajectory_generator.set_mp_duration(self.trajectory_generator.tau, np.array([self.dt]))
self.trajectory_generator.set_mp_duration(None if self.sequencing else self.duration, np.array([self.dt])) # TODO: Bruce said DMP, ProMP, ProDMP can have 0 bc_time
self.trajectory_generator.set_boundary_conditions(bc_time=, bc_pos=self.current_pos, self.trajectory_generator.set_boundary_conditions(bc_time=np.zeros((1,)), bc_pos=self.current_pos,
bc_vel=self.current_vel) bc_vel=self.current_vel)
traj_dict = self.trajectory_generator.get_mp_trajs(get_pos=True, get_vel=True) # TODO: is this correct for replanning? Do we need to adjust anything here?
self.trajectory_generator.set_duration(None if self.sequencing else self.duration, np.array([self.dt]))
traj_dict = self.trajectory_generator.get_trajs(get_pos=True, get_vel=True)
trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel'] trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel']
trajectory = trajectory_tensor.numpy() return get_numpy(trajectory_tensor), get_numpy(velocity_tensor)
velocity = velocity_tensor.numpy()
# TODO: Do we need this or does mp_pytorch have this? def get_traj_gen_action_space(self):
if self.post_traj_steps > 0:
trajectory = np.vstack([trajectory, np.tile(trajectory[-1, :], [self.post_traj_steps, 1])])
velocity = np.vstack([velocity, np.zeros(shape=(self.post_traj_steps, self.trajectory_generator.num_dof))])
return trajectory, velocity
def get_mp_action_space(self):
"""This function can be used to set up an individual space for the parameters of the trajectory_generator.""" """This function can be used to set up an individual space for the parameters of the trajectory_generator."""
min_action_bounds, max_action_bounds = self.trajectory_generator.get_param_bounds() min_action_bounds, max_action_bounds = self.trajectory_generator.get_param_bounds()
mp_action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(), mp_action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(),
@ -108,22 +92,17 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC):
Only needs to be overwritten if the action space needs to be modified. Only needs to be overwritten if the action space needs to be modified.
""" """
try: try:
return self.mp_action_space return self.traj_gen_action_space
except AttributeError: except AttributeError:
return self.get_mp_action_space() return self.get_traj_gen_action_space()
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" This function generates a trajectory based on a MP and then does the usual loop over reset and step""" """ This function generates a trajectory based on a MP and then does the usual loop over reset and step"""
# TODO: Think about sequencing
# TODO: Reward Function rather here?
# agent to learn when to release the ball # agent to learn when to release the ball
mp_params, env_spec_params = self._episode_callback(action) mp_params, env_spec_params = self._episode_callback(action)
trajectory, velocity = self.get_trajectory(mp_params) trajectory, velocity = self.get_trajectory(mp_params)
# TODO
# self.time_steps = np.linspace(0, learned_duration, self.traj_steps)
# self.trajectory_generator.set_mp_times(self.time_steps)
trajectory_length = len(trajectory) trajectory_length = len(trajectory)
rewards = np.zeros(shape=(trajectory_length,)) rewards = np.zeros(shape=(trajectory_length,))
if self.verbose >= 2: if self.verbose >= 2:
@ -152,13 +131,15 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC):
elems[t] = v elems[t] = v
infos[k] = elems infos[k] = elems
if self.render_mode is not None: if self.render_kwargs:
self.render(mode=self.render_mode, **self.render_kwargs) self.render(**self.render_kwargs)
if done or self.env.do_replanning(self.current_pos, self.current_vel, obs, c_action, t + past_steps): if done or self.env.do_replanning(self.current_pos, self.current_vel, obs, c_action,
t + 1 + self.current_traj_steps):
break break
infos.update({k: v[:t + 1] for k, v in infos.items()}) infos.update({k: v[:t + 1] for k, v in infos.items()})
self.current_traj_steps += t + 1
if self.verbose >= 2: if self.verbose >= 2:
infos['trajectory'] = trajectory infos['trajectory'] = trajectory
@ -168,24 +149,17 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC):
infos['trajectory_length'] = t + 1 infos['trajectory_length'] = t + 1
trajectory_return = self.reward_aggregation(rewards[:t + 1]) trajectory_return = self.reward_aggregation(rewards[:t + 1])
return self.get_observation_from_step(obs), trajectory_return, done, infos return obs, trajectory_return, done, infos
def reset(self): def render(self, **kwargs):
return self.get_observation_from_step(self.env.reset())
def render(self, mode='human', **kwargs):
"""Only set render options here, such that they can be used during the rollout. """Only set render options here, such that they can be used during the rollout.
This only needs to be called once""" This only needs to be called once"""
self.render_mode = mode
self.render_kwargs = kwargs self.render_kwargs = kwargs
# self.env.render(mode=self.render_mode, **self.render_kwargs) # self.env.render(mode=self.render_mode, **self.render_kwargs)
self.env.render(mode=self.render_mode) self.env.render(**kwargs)
def get_observation_from_step(self, observation: np.ndarray) -> np.ndarray: def reset(self, **kwargs):
return observation[self.active_obs] self.current_traj_steps = 0
def seed(self, seed=None):
self.env.seed(seed)
def plot_trajs(self, des_trajs, des_vels): def plot_trajs(self, des_trajs, des_vels):
import matplotlib.pyplot as plt import matplotlib.pyplot as plt

View File

@ -1,4 +1,5 @@
import numpy as np import numpy as np
import torch as ch
def angle_normalize(x, type="deg"): def angle_normalize(x, type="deg"):
@ -19,3 +20,7 @@ def angle_normalize(x, type="deg"):
two_pi = 2 * np.pi two_pi = 2 * np.pi
return x - two_pi * np.floor((x + np.pi) / two_pi) return x - two_pi * np.floor((x + np.pi) / two_pi)
def get_numpy(x: ch.Tensor):
return x.detach().cpu().numpy()