From 9b9b09234941916adef63c465cd30f9a570710cc Mon Sep 17 00:00:00 2001 From: Maximilian Huettenrauch Date: Tue, 22 Jun 2021 14:19:42 +0200 Subject: [PATCH] start refactor and biac dev merge --- .gitignore | 2 + MUJOCO_LOG.TXT | 15 -- .../mujoco/ball_in_a_cup/ball_in_a_cup.py | 30 +++- .../ball_in_a_cup_reward_simple.py | 21 +-- alr_envs/utils/make_env_helpers.py | 4 +- alr_envs/utils/mp_env_async_sampler.py | 11 +- alr_envs/utils/mps/__init__.py | 0 alr_envs/utils/mps/alr_env.py | 40 ------ alr_envs/utils/mps/detpmp_wrapper.py | 54 ------- alr_envs/utils/mps/dmp_wrapper.py | 76 ---------- alr_envs/utils/mps/mp_wrapper.py | 134 ------------------ alr_envs/utils/policies.py | 61 -------- alr_envs/utils/positional_env.py | 22 --- 13 files changed, 50 insertions(+), 420 deletions(-) delete mode 100644 MUJOCO_LOG.TXT delete mode 100644 alr_envs/utils/mps/__init__.py delete mode 100644 alr_envs/utils/mps/alr_env.py delete mode 100644 alr_envs/utils/mps/detpmp_wrapper.py delete mode 100644 alr_envs/utils/mps/dmp_wrapper.py delete mode 100644 alr_envs/utils/mps/mp_wrapper.py delete mode 100644 alr_envs/utils/policies.py delete mode 100644 alr_envs/utils/positional_env.py diff --git a/.gitignore b/.gitignore index 79c652b..ec01816 100644 --- a/.gitignore +++ b/.gitignore @@ -109,3 +109,5 @@ venv.bak/ #configs /configs/db.cfg +legacy/ +MUJOCO_LOG.TXT diff --git a/MUJOCO_LOG.TXT b/MUJOCO_LOG.TXT deleted file mode 100644 index d398e34..0000000 --- a/MUJOCO_LOG.TXT +++ /dev/null @@ -1,15 +0,0 @@ -Fri Aug 28 14:41:56 2020 -ERROR: GLEW initalization error: Missing GL version - -Fri Aug 28 14:59:14 2020 -ERROR: GLEW initalization error: Missing GL version - -Fri Aug 28 15:03:43 2020 -ERROR: GLEW initalization error: Missing GL version - -Fri Aug 28 15:07:03 2020 -ERROR: GLEW initalization error: Missing GL version - -Fri Aug 28 15:15:01 2020 -ERROR: GLEW initalization error: Missing GL version - diff --git a/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup.py b/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup.py index a7816c9..8ca948b 100644 --- a/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup.py +++ b/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup.py @@ -41,7 +41,7 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle): from alr_envs.mujoco.ball_in_a_cup.ball_in_a_cup_reward import BallInACupReward reward_function = BallInACupReward else: - raise ValueError("Unknown reward type") + raise ValueError("Unknown reward type: {}".format(reward_type)) self.reward_function = reward_function(self.sim_steps) @property @@ -66,6 +66,10 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle): def current_vel(self): return self.sim.data.qvel[0:7].copy() + def reset(self): + self.reward_function.reset(None) + return super().reset() + def reset_model(self): init_pos_all = self.init_qpos.copy() init_pos_robot = self._start_pos @@ -100,14 +104,18 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle): done = success or self._steps == self.sim_steps - 1 or is_collided self._steps += 1 else: - reward = -2 + reward = -2000 success = False is_collided = False done = True return ob, reward, done, dict(reward_dist=reward_dist, reward_ctrl=reward_ctrl, velocity=angular_vel, - traj=self._q_pos, is_success=success, + # traj=self._q_pos, + action=a, + q_pos=self.sim.data.qpos[0:7].ravel().copy(), + q_vel=self.sim.data.qvel[0:7].ravel().copy(), + is_success=success, is_collided=is_collided, sim_crash=crash) def check_traj_in_joint_limits(self): @@ -148,6 +156,22 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle): des_vel_full[5] = des_vel[2] return des_vel_full + def render(self, render_mode, **render_kwargs): + if render_mode == "plot_trajectory": + if self._steps == 1: + import matplotlib.pyplot as plt + # plt.ion() + self.fig, self.axs = plt.subplots(3, 1) + + if self._steps <= 1750: + for ax, cp in zip(self.axs, self.current_pos[1::2]): + ax.scatter(self._steps, cp, s=2, marker=".") + + # self.fig.show() + + else: + super().render(render_mode, **render_kwargs) + if __name__ == "__main__": env = ALRBallInACupEnv() diff --git a/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup_reward_simple.py b/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup_reward_simple.py index 79987d6..daee289 100644 --- a/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup_reward_simple.py +++ b/alr_envs/mujoco/ball_in_a_cup/ball_in_a_cup_reward_simple.py @@ -22,7 +22,7 @@ class BallInACupReward(alr_reward_fct.AlrReward): self.goal_final_id = None self.collision_ids = None self._is_collided = False - self.collision_penalty = 1 + self.collision_penalty = 1000 self.ball_traj = None self.dists = None @@ -37,6 +37,7 @@ class BallInACupReward(alr_reward_fct.AlrReward): self.dists_final = [] self.costs = [] self.action_costs = [] + self.angle_costs = [] self.cup_angles = [] def compute_reward(self, action, env): @@ -56,8 +57,11 @@ class BallInACupReward(alr_reward_fct.AlrReward): self.dists_final.append(np.linalg.norm(goal_final_pos - ball_pos)) self.ball_traj[env._steps, :] = ball_pos cup_quat = np.copy(env.sim.data.body_xquat[env.sim.model._body_name2id["cup"]]) - self.cup_angles.append(np.arctan2(2 * (cup_quat[0] * cup_quat[1] + cup_quat[2] * cup_quat[3]), - 1 - 2 * (cup_quat[1]**2 + cup_quat[2]**2))) + cup_angle = np.arctan2(2 * (cup_quat[0] * cup_quat[1] + cup_quat[2] * cup_quat[3]), + 1 - 2 * (cup_quat[1]**2 + cup_quat[2]**2)) + cost_angle = (cup_angle - np.pi / 2) ** 2 + self.angle_costs.append(cost_angle) + self.cup_angles.append(cup_angle) action_cost = np.sum(np.square(action)) self.action_costs.append(action_cost) @@ -67,20 +71,21 @@ class BallInACupReward(alr_reward_fct.AlrReward): if env._steps == env.sim_steps - 1 or self._is_collided: t_min_dist = np.argmin(self.dists) angle_min_dist = self.cup_angles[t_min_dist] - cost_angle = (angle_min_dist - np.pi / 2)**2 + # cost_angle = (angle_min_dist - np.pi / 2)**2 - min_dist = self.dists[t_min_dist] + + # min_dist = self.dists[t_min_dist] dist_final = self.dists_final[-1] min_dist_final = np.min(self.dists_final) - cost = 0.5 * dist_final + 0.05 * cost_angle # TODO: Increase cost_angle weight # 0.5 * min_dist + + # cost = 0.5 * dist_final + 0.05 * cost_angle # TODO: Increase cost_angle weight # 0.5 * min_dist + # reward = np.exp(-2 * cost) - 1e-2 * action_cost - self.collision_penalty * int(self._is_collided) # reward = - dist_final**2 - 1e-4 * cost_angle - 1e-5 * action_cost - self.collision_penalty * int(self._is_collided) - reward = - dist_final**2 - min_dist_final**2 - 1e-4 * cost_angle - 1e-5 * action_cost - self.collision_penalty * int(self._is_collided) + reward = - dist_final**2 - min_dist_final**2 - 1e-4 * cost_angle - 1e-3 * action_cost - self.collision_penalty * int(self._is_collided) success = dist_final < 0.05 and ball_in_cup and not self._is_collided crash = self._is_collided else: - reward = - 1e-5 * action_cost # TODO: increase action_cost weight + reward = - 1e-3 * action_cost - 1e-4 * cost_angle # TODO: increase action_cost weight success = False crash = False diff --git a/alr_envs/utils/make_env_helpers.py b/alr_envs/utils/make_env_helpers.py index d455496..e876ee8 100644 --- a/alr_envs/utils/make_env_helpers.py +++ b/alr_envs/utils/make_env_helpers.py @@ -1,5 +1,5 @@ -from alr_envs.utils.mps.dmp_wrapper import DmpWrapper -from alr_envs.utils.mps.detpmp_wrapper import DetPMPWrapper +from mp_env_api.mp_wrappers.dmp_wrapper import DmpWrapper +from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper import gym from gym.vector.utils import write_to_shared_memory import sys diff --git a/alr_envs/utils/mp_env_async_sampler.py b/alr_envs/utils/mp_env_async_sampler.py index 5cda41f..30f23c5 100644 --- a/alr_envs/utils/mp_env_async_sampler.py +++ b/alr_envs/utils/mp_env_async_sampler.py @@ -91,20 +91,21 @@ class AlrContextualMpEnvSampler: repeat = int(np.ceil(n_samples / self.env.num_envs)) vals = defaultdict(list) + + obs = self.env.reset() for i in range(repeat): - new_contexts = self.env.reset() - vals['new_contexts'].append(new_contexts) - new_samples, new_contexts = dist.sample(new_contexts) + vals['obs'].append(obs) + new_samples, new_contexts = dist.sample(obs) vals['new_samples'].append(new_samples) obs, reward, done, info = self.env.step(new_samples) - vals['obs'].append(obs) + vals['reward'].append(reward) vals['done'].append(done) vals['info'].append(info) # do not return values above threshold - return np.vstack(vals['new_samples'])[:n_samples], np.vstack(vals['new_contexts'])[:n_samples], \ + return np.vstack(vals['new_samples'])[:n_samples], \ np.vstack(vals['obs'])[:n_samples], np.hstack(vals['reward'])[:n_samples], \ _flatten_list(vals['done'])[:n_samples], _flatten_list(vals['info'])[:n_samples] diff --git a/alr_envs/utils/mps/__init__.py b/alr_envs/utils/mps/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/alr_envs/utils/mps/alr_env.py b/alr_envs/utils/mps/alr_env.py deleted file mode 100644 index fca02b9..0000000 --- a/alr_envs/utils/mps/alr_env.py +++ /dev/null @@ -1,40 +0,0 @@ -from abc import abstractmethod, ABC -from typing import Union - -import gym -import numpy as np - - -class AlrEnv(gym.Env, ABC): - - @property - def active_obs(self): - """Returns boolean mask for each observation entry - whether the observation is returned for the contextual case or not. - This effectively allows to filter unwanted or unnecessary observations from the full step-based case. - """ - return np.ones(self.observation_space.shape, dtype=bool) - - @property - @abstractmethod - def start_pos(self) -> Union[float, int, np.ndarray]: - """ - Returns the starting position of the joints - """ - raise NotImplementedError() - - @property - def goal_pos(self) -> Union[float, int, np.ndarray]: - """ - Returns the current final position of the joints for the MP. - By default this returns the starting position. - """ - return self.start_pos - - @property - @abstractmethod - def dt(self) -> Union[float, int]: - """ - Returns the time between two simulated steps of the environment - """ - raise NotImplementedError() \ No newline at end of file diff --git a/alr_envs/utils/mps/detpmp_wrapper.py b/alr_envs/utils/mps/detpmp_wrapper.py deleted file mode 100644 index 724a50c..0000000 --- a/alr_envs/utils/mps/detpmp_wrapper.py +++ /dev/null @@ -1,54 +0,0 @@ -import gym -import numpy as np -from mp_lib import det_promp - -from alr_envs.utils.mps.alr_env import AlrEnv -from alr_envs.utils.mps.mp_wrapper import MPWrapper - - -class DetPMPWrapper(MPWrapper): - def __init__(self, env: AlrEnv, num_dof, num_basis, width, start_pos=None, duration=1, post_traj_time=0., - policy_type=None, weights_scale=1, zero_start=False, zero_goal=False, learn_mp_length: bool =True, - **mp_kwargs): - self.duration = duration # seconds - - super().__init__(env=env, num_dof=num_dof, duration=duration, post_traj_time=post_traj_time, - policy_type=policy_type, weights_scale=weights_scale, num_basis=num_basis, - width=width, zero_start=zero_start, zero_goal=zero_goal, - **mp_kwargs) - - self.learn_mp_length = learn_mp_length - if self.learn_mp_length: - parameter_space_shape = (1+num_basis*num_dof,) - else: - parameter_space_shape = (num_basis * num_dof,) - self.min_param = -np.inf - self.max_param = np.inf - self.parameterization_space = gym.spaces.Box(low=self.min_param, high=self.max_param, - shape=parameter_space_shape, dtype=np.float32) - - self.start_pos = start_pos - - def initialize_mp(self, num_dof: int, duration: int, num_basis: int = 5, width: float = None, - zero_start: bool = False, zero_goal: bool = False, **kwargs): - pmp = det_promp.DeterministicProMP(n_basis=num_basis, n_dof=num_dof, width=width, off=0.01, - zero_start=zero_start, zero_goal=zero_goal) - - weights = np.zeros(shape=(num_basis, num_dof)) - pmp.set_weights(duration, weights) - - return pmp - - def mp_rollout(self, action): - if self.learn_mp_length: - duration = max(1, self.duration*abs(action[0])) - params = np.reshape(action[1:], (self.mp.n_basis, -1)) * self.weights_scale # TODO: Fix Bug when zero_start is true - else: - duration = self.duration - params = np.reshape(action, (self.mp.n_basis, -1)) * self.weights_scale # TODO: Fix Bug when zero_start is true - self.mp.set_weights(1., params) - _, des_pos, des_vel, _ = self.mp.compute_trajectory(frequency=max(1, duration)) - if self.mp.zero_start: - des_pos += self.start_pos - - return des_pos, des_vel diff --git a/alr_envs/utils/mps/dmp_wrapper.py b/alr_envs/utils/mps/dmp_wrapper.py deleted file mode 100644 index 8dbd26d..0000000 --- a/alr_envs/utils/mps/dmp_wrapper.py +++ /dev/null @@ -1,76 +0,0 @@ -import gym -import numpy as np -from mp_lib import dmps -from mp_lib.basis import DMPBasisGenerator -from mp_lib.phase import ExpDecayPhaseGenerator - -from alr_envs.utils.mps.alr_env import AlrEnv -from alr_envs.utils.mps.mp_wrapper import MPWrapper - - -class DmpWrapper(MPWrapper): - - def __init__(self, env: AlrEnv, num_dof: int, num_basis: int, - duration: int = 1, alpha_phase: float = 2., dt: float = None, - learn_goal: bool = False, post_traj_time: float = 0., - weights_scale: float = 1., goal_scale: float = 1., bandwidth_factor: float = 3., - policy_type: str = None, render_mode: str = None): - - """ - This Wrapper generates a trajectory based on a DMP and will only return episodic performances. - Args: - env: - num_dof: - num_basis: - duration: - alpha_phase: - dt: - learn_goal: - post_traj_time: - policy_type: - weights_scale: - goal_scale: - """ - self.learn_goal = learn_goal - - self.t = np.linspace(0, duration, int(duration / dt)) - self.goal_scale = goal_scale - - super().__init__(env=env, num_dof=num_dof, duration=duration, post_traj_time=post_traj_time, - policy_type=policy_type, weights_scale=weights_scale, render_mode=render_mode, - num_basis=num_basis, alpha_phase=alpha_phase, bandwidth_factor=bandwidth_factor) - - action_bounds = np.inf * np.ones((np.prod(self.mp.weights.shape) + (num_dof if learn_goal else 0))) - self.action_space = gym.spaces.Box(low=-action_bounds, high=action_bounds, dtype=np.float32) - - def initialize_mp(self, num_dof: int, duration: int, num_basis: int, alpha_phase: float = 2., - bandwidth_factor: int = 3, **kwargs): - - phase_generator = ExpDecayPhaseGenerator(alpha_phase=alpha_phase, duration=duration) - basis_generator = DMPBasisGenerator(phase_generator, duration=duration, num_basis=num_basis, - basis_bandwidth_factor=bandwidth_factor) - - dmp = dmps.DMP(num_dof=num_dof, basis_generator=basis_generator, phase_generator=phase_generator, - dt=self.dt) - - return dmp - - def goal_and_weights(self, params): - assert params.shape[-1] == self.action_space.shape[0] - params = np.atleast_2d(params) - - if self.learn_goal: - goal_pos = params[0, -self.mp.num_dimensions:] # [num_dof] - params = params[:, :-self.mp.num_dimensions] # [1,num_dof] - else: - goal_pos = self.env.goal_pos - assert goal_pos is not None - - weight_matrix = np.reshape(params, self.mp.weights.shape) # [num_basis, num_dof] - return goal_pos * self.goal_scale, weight_matrix * self.weights_scale - - def mp_rollout(self, action): - self.mp.dmp_start_pos = self.env.start_pos - goal_pos, weight_matrix = self.goal_and_weights(action) - self.mp.set_weights(weight_matrix, goal_pos) - return self.mp.reference_trajectory(self.t) diff --git a/alr_envs/utils/mps/mp_wrapper.py b/alr_envs/utils/mps/mp_wrapper.py deleted file mode 100644 index 84bffd1..0000000 --- a/alr_envs/utils/mps/mp_wrapper.py +++ /dev/null @@ -1,134 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Union - -import gym -import numpy as np - -from alr_envs.utils.mps.alr_env import AlrEnv -from alr_envs.utils.policies import get_policy_class, BaseController - - -class MPWrapper(gym.Wrapper, ABC): - """ - Base class for movement primitive based gym.Wrapper implementations. - - :param env: The (wrapped) environment this wrapper is applied on - :param num_dof: Dimension of the action space of the wrapped env - :param duration: Number of timesteps in the trajectory of the movement primitive - :param post_traj_time: Time for which the last position of the trajectory is fed to the environment to continue - simulation - :param policy_type: Type or object defining the policy that is used to generate action based on the trajectory - :param weight_scale: Scaling parameter for the actions given to this wrapper - :param render_mode: Equivalent to gym render mode - """ - def __init__(self, - env: AlrEnv, - num_dof: int, - duration: int = 1, - post_traj_time: float = 0., - policy_type: Union[str, BaseController] = None, - weights_scale: float = 1., - render_mode: str = None, - **mp_kwargs - ): - super().__init__(env) - - # adjust observation space to reduce version - obs_sp = self.env.observation_space - self.observation_space = gym.spaces.Box(low=obs_sp.low[self.env.active_obs], - high=obs_sp.high[self.env.active_obs], - dtype=obs_sp.dtype) - - self.post_traj_steps = int(post_traj_time / env.dt) - - self.mp = self.initialize_mp(num_dof=num_dof, duration=duration, **mp_kwargs) - self.weights_scale = weights_scale - - if type(policy_type) is str: - self.policy = get_policy_class(policy_type, env, mp_kwargs) - else: - self.policy = policy_type - - # rendering - self.render_mode = render_mode - self.render_kwargs = {} - - # TODO: @Max I think this should not be in this class, this functionality should be part of your sampler. - def __call__(self, params, contexts=None): - """ - Can be used to provide a batch of parameter sets - """ - params = np.atleast_2d(params) - obs = [] - rewards = [] - dones = [] - infos = [] - # for p, c in zip(params, contexts): - for p in params: - # self.configure(c) - ob, reward, done, info = self.step(p) - obs.append(ob) - rewards.append(reward) - dones.append(done) - infos.append(info) - - return obs, np.array(rewards), dones, infos - - def reset(self): - return self.env.reset()[self.env.active_obs] - - def step(self, action: np.ndarray): - """ This function generates a trajectory based on a DMP and then does the usual loop over reset and step""" - trajectory, velocity = self.mp_rollout(action) - - if self.post_traj_steps > 0: - trajectory = np.vstack([trajectory, np.tile(trajectory[-1, :], [self.post_traj_steps, 1])]) - velocity = np.vstack([velocity, np.zeros(shape=(self.post_traj_steps, self.mp.num_dimensions))]) - - trajectory_length = len(trajectory) - actions = np.zeros(shape=(trajectory_length, self.mp.num_dimensions)) - observations= np.zeros(shape=(trajectory_length,) + self.env.observation_space.shape) - rewards = np.zeros(shape=(trajectory_length,)) - trajectory_return = 0 - infos = dict(step_infos =[]) - for t, pos_vel in enumerate(zip(trajectory, velocity)): - actions[t,:] = self.policy.get_action(pos_vel[0], pos_vel[1]) - observations[t,:], rewards[t], done, info = self.env.step(actions[t,:]) - trajectory_return += rewards[t] - infos['step_infos'].append(info) - if self.render_mode: - self.env.render(mode=self.render_mode, **self.render_kwargs) - if done: - break - - infos['step_actions'] = actions[:t+1] - infos['step_observations'] = observations[:t+1] - infos['step_rewards'] = rewards[:t+1] - infos['trajectory_length'] = t+1 - done = True - return observations[t][self.env.active_obs], trajectory_return, done, infos - - def render(self, mode='human', **kwargs): - """Only set render options here, such that they can be used during the rollout. - This only needs to be called once""" - self.render_mode = mode - self.render_kwargs = kwargs - - @abstractmethod - def mp_rollout(self, action): - """ - Generate trajectory and velocity based on the MP - Returns: - trajectory/positions, velocity - """ - raise NotImplementedError() - - @abstractmethod - def initialize_mp(self, num_dof: int, duration: float, **kwargs): - """ - Create respective instance of MP - Returns: - MP instance - """ - - raise NotImplementedError diff --git a/alr_envs/utils/policies.py b/alr_envs/utils/policies.py deleted file mode 100644 index 3c0dbf8..0000000 --- a/alr_envs/utils/policies.py +++ /dev/null @@ -1,61 +0,0 @@ -from typing import Tuple, Union - -from gym import Env - -from alr_envs.utils.positional_env import PositionalEnv - - -class BaseController: - def __init__(self, env: Env, **kwargs): - self.env = env - - def get_action(self, des_pos, des_vel): - raise NotImplementedError - - -class PosController(BaseController): - def get_action(self, des_pos, des_vel): - return des_pos - - -class VelController(BaseController): - def get_action(self, des_pos, des_vel): - return des_vel - - -class PDController(BaseController): - """ - A PD-Controller. Using position and velocity information from a provided positional environment, - the controller calculates a response based on the desired position and velocity - - :param env: A position environment - :param p_gains: Factors for the proportional gains - :param d_gains: Factors for the differential gains - """ - def __init__(self, - env: PositionalEnv, - p_gains: Union[float, Tuple], - d_gains: Union[float, Tuple]): - self.p_gains = p_gains - self.d_gains = d_gains - super(PDController, self).__init__(env, ) - - def get_action(self, des_pos, des_vel): - cur_pos = self.env.current_pos - cur_vel = self.env.current_vel - assert des_pos.shape != cur_pos.shape, \ - "Mismatch in dimension between desired position {} and current position {}".format(des_pos.shape, cur_pos.shape) - assert des_vel.shape != cur_vel.shape, \ - "Mismatch in dimension between desired velocity {} and current velocity {}".format(des_vel.shape, - cur_vel.shape) - trq = self.p_gains * (des_pos - cur_pos) + self.d_gains * (des_vel - cur_vel) - return trq - - -def get_policy_class(policy_type, env, mp_kwargs, **kwargs): - if policy_type == "motor": - return PDController(env, p_gains=mp_kwargs['p_gains'], d_gains=mp_kwargs['d_gains']) - elif policy_type == "velocity": - return VelController(env) - elif policy_type == "position": - return PosController(env) diff --git a/alr_envs/utils/positional_env.py b/alr_envs/utils/positional_env.py deleted file mode 100644 index 37473df..0000000 --- a/alr_envs/utils/positional_env.py +++ /dev/null @@ -1,22 +0,0 @@ -from abc import abstractmethod -from typing import Union, Tuple - -import numpy as np -from gym import Env - -class PositionalEnv(Env): - """A position and velocity based environment. It functions just as any regular OpenAI Gym - environment but it provides position, velocity and acceleration information. This usually means that the - corresponding information from the agent is forwarded via the properties. - PD-Controller based policies require this environment to calculate the state dependent actions for example. - """ - - @property - @abstractmethod - def current_pos(self) -> Union[float, int, np.ndarray, Tuple]: - raise NotImplementedError - - @property - @abstractmethod - def current_vel(self) -> Union[float, int, np.ndarray, Tuple]: - raise NotImplementedError