diff --git a/alr_envs/mp/black_box_wrapper.py b/alr_envs/mp/black_box_wrapper.py index d87c332..9e6a9e5 100644 --- a/alr_envs/mp/black_box_wrapper.py +++ b/alr_envs/mp/black_box_wrapper.py @@ -8,6 +8,7 @@ from mp_pytorch.mp.mp_interfaces import MPInterface from alr_envs.mp.controllers.base_controller import BaseController from alr_envs.mp.raw_interface_wrapper import RawInterfaceWrapper +from alr_envs.utils.utils import get_numpy class BlackBoxWrapper(gym.ObservationWrapper, ABC): @@ -15,7 +16,7 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC): def __init__(self, env: RawInterfaceWrapper, trajectory_generator: MPInterface, tracking_controller: BaseController, - duration: float, verbose: int = 1, sequencing=True, reward_aggregation: callable = np.sum): + duration: float, verbose: int = 1, sequencing: bool = True, reward_aggregation: callable = np.sum): """ gym.Wrapper for leveraging a black box approach with a trajectory generator. @@ -33,67 +34,50 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC): self.env = env self.duration = duration self.sequencing = sequencing - # self.traj_steps = int(duration / self.dt) - # self.post_traj_steps = self.env.spec.max_episode_steps - self.traj_steps - # duration = self.env.max_episode_steps * self.dt + self.current_traj_steps = 0 # trajectory generation self.trajectory_generator = trajectory_generator self.tracking_controller = tracking_controller - # self.weight_scale = weight_scale # self.time_steps = np.linspace(0, self.duration, self.traj_steps) # self.trajectory_generator.set_mp_times(self.time_steps) - if not sequencing: - self.trajectory_generator.set_mp_duration(np.array([self.duration]), np.array([self.dt])) - else: - # sequencing stuff - pass + self.trajectory_generator.set_duration(np.array([self.duration]), np.array([self.dt])) # reward computation self.reward_aggregation = reward_aggregation # spaces - self.mp_action_space = self.get_mp_action_space() + self.return_context_observation = not (self.sequencing) # TODO or we_do_replanning?) + self.traj_gen_action_space = self.get_traj_gen_action_space() self.action_space = self.get_action_space() self.observation_space = spaces.Box(low=self.env.observation_space.low[self.env.context_mask], high=self.env.observation_space.high[self.env.context_mask], dtype=self.env.observation_space.dtype) # rendering - self.render_mode = None self.render_kwargs = {} - self.verbose = verbose - @property - def dt(self): - return self.env.dt - def observation(self, observation): - return observation[self.env.context_mask] + # return context space if we are + return observation[self.context_mask] if self.return_context_observation else observation def get_trajectory(self, action: np.ndarray) -> Tuple: - clipped_params = np.clip(action, self.mp_action_space.low, self.mp_action_space.high) + clipped_params = np.clip(action, self.traj_gen_action_space.low, self.traj_gen_action_space.high) self.trajectory_generator.set_params(clipped_params) # if self.trajectory_generator.learn_tau: # self.trajectory_generator.set_mp_duration(self.trajectory_generator.tau, np.array([self.dt])) - self.trajectory_generator.set_mp_duration(None if self.sequencing else self.duration, np.array([self.dt])) - self.trajectory_generator.set_boundary_conditions(bc_time=, bc_pos=self.current_pos, + # TODO: Bruce said DMP, ProMP, ProDMP can have 0 bc_time + self.trajectory_generator.set_boundary_conditions(bc_time=np.zeros((1,)), bc_pos=self.current_pos, bc_vel=self.current_vel) - traj_dict = self.trajectory_generator.get_mp_trajs(get_pos=True, get_vel=True) + # TODO: is this correct for replanning? Do we need to adjust anything here? + self.trajectory_generator.set_duration(None if self.sequencing else self.duration, np.array([self.dt])) + traj_dict = self.trajectory_generator.get_trajs(get_pos=True, get_vel=True) trajectory_tensor, velocity_tensor = traj_dict['pos'], traj_dict['vel'] - trajectory = trajectory_tensor.numpy() - velocity = velocity_tensor.numpy() + return get_numpy(trajectory_tensor), get_numpy(velocity_tensor) - # TODO: Do we need this or does mp_pytorch have this? - if self.post_traj_steps > 0: - trajectory = np.vstack([trajectory, np.tile(trajectory[-1, :], [self.post_traj_steps, 1])]) - velocity = np.vstack([velocity, np.zeros(shape=(self.post_traj_steps, self.trajectory_generator.num_dof))]) - - return trajectory, velocity - - def get_mp_action_space(self): + def get_traj_gen_action_space(self): """This function can be used to set up an individual space for the parameters of the trajectory_generator.""" min_action_bounds, max_action_bounds = self.trajectory_generator.get_param_bounds() mp_action_space = gym.spaces.Box(low=min_action_bounds.numpy(), high=max_action_bounds.numpy(), @@ -108,22 +92,17 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC): Only needs to be overwritten if the action space needs to be modified. """ try: - return self.mp_action_space + return self.traj_gen_action_space except AttributeError: - return self.get_mp_action_space() + return self.get_traj_gen_action_space() def step(self, action: np.ndarray): """ This function generates a trajectory based on a MP and then does the usual loop over reset and step""" - # TODO: Think about sequencing - # TODO: Reward Function rather here? + # agent to learn when to release the ball mp_params, env_spec_params = self._episode_callback(action) trajectory, velocity = self.get_trajectory(mp_params) - # TODO - # self.time_steps = np.linspace(0, learned_duration, self.traj_steps) - # self.trajectory_generator.set_mp_times(self.time_steps) - trajectory_length = len(trajectory) rewards = np.zeros(shape=(trajectory_length,)) if self.verbose >= 2: @@ -152,13 +131,15 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC): elems[t] = v infos[k] = elems - if self.render_mode is not None: - self.render(mode=self.render_mode, **self.render_kwargs) + if self.render_kwargs: + self.render(**self.render_kwargs) - if done or self.env.do_replanning(self.current_pos, self.current_vel, obs, c_action, t + past_steps): + if done or self.env.do_replanning(self.current_pos, self.current_vel, obs, c_action, + t + 1 + self.current_traj_steps): break infos.update({k: v[:t + 1] for k, v in infos.items()}) + self.current_traj_steps += t + 1 if self.verbose >= 2: infos['trajectory'] = trajectory @@ -168,24 +149,17 @@ class BlackBoxWrapper(gym.ObservationWrapper, ABC): infos['trajectory_length'] = t + 1 trajectory_return = self.reward_aggregation(rewards[:t + 1]) - return self.get_observation_from_step(obs), trajectory_return, done, infos + return obs, trajectory_return, done, infos - def reset(self): - return self.get_observation_from_step(self.env.reset()) - - def render(self, mode='human', **kwargs): + def render(self, **kwargs): """Only set render options here, such that they can be used during the rollout. This only needs to be called once""" - self.render_mode = mode self.render_kwargs = kwargs # self.env.render(mode=self.render_mode, **self.render_kwargs) - self.env.render(mode=self.render_mode) + self.env.render(**kwargs) - def get_observation_from_step(self, observation: np.ndarray) -> np.ndarray: - return observation[self.active_obs] - - def seed(self, seed=None): - self.env.seed(seed) + def reset(self, **kwargs): + self.current_traj_steps = 0 def plot_trajs(self, des_trajs, des_vels): import matplotlib.pyplot as plt diff --git a/alr_envs/utils/utils.py b/alr_envs/utils/utils.py index 3354db3..b90cf60 100644 --- a/alr_envs/utils/utils.py +++ b/alr_envs/utils/utils.py @@ -1,4 +1,5 @@ import numpy as np +import torch as ch def angle_normalize(x, type="deg"): @@ -19,3 +20,7 @@ def angle_normalize(x, type="deg"): two_pi = 2 * np.pi return x - two_pi * np.floor((x + np.pi) / two_pi) + + +def get_numpy(x: ch.Tensor): + return x.detach().cpu().numpy()