start refactor and biac dev merge
This commit is contained in:
parent
362d2cf24f
commit
9b9b092349
2
.gitignore
vendored
2
.gitignore
vendored
@ -109,3 +109,5 @@ venv.bak/
|
||||
|
||||
#configs
|
||||
/configs/db.cfg
|
||||
legacy/
|
||||
MUJOCO_LOG.TXT
|
||||
|
@ -1,15 +0,0 @@
|
||||
Fri Aug 28 14:41:56 2020
|
||||
ERROR: GLEW initalization error: Missing GL version
|
||||
|
||||
Fri Aug 28 14:59:14 2020
|
||||
ERROR: GLEW initalization error: Missing GL version
|
||||
|
||||
Fri Aug 28 15:03:43 2020
|
||||
ERROR: GLEW initalization error: Missing GL version
|
||||
|
||||
Fri Aug 28 15:07:03 2020
|
||||
ERROR: GLEW initalization error: Missing GL version
|
||||
|
||||
Fri Aug 28 15:15:01 2020
|
||||
ERROR: GLEW initalization error: Missing GL version
|
||||
|
@ -41,7 +41,7 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle):
|
||||
from alr_envs.mujoco.ball_in_a_cup.ball_in_a_cup_reward import BallInACupReward
|
||||
reward_function = BallInACupReward
|
||||
else:
|
||||
raise ValueError("Unknown reward type")
|
||||
raise ValueError("Unknown reward type: {}".format(reward_type))
|
||||
self.reward_function = reward_function(self.sim_steps)
|
||||
|
||||
@property
|
||||
@ -66,6 +66,10 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle):
|
||||
def current_vel(self):
|
||||
return self.sim.data.qvel[0:7].copy()
|
||||
|
||||
def reset(self):
|
||||
self.reward_function.reset(None)
|
||||
return super().reset()
|
||||
|
||||
def reset_model(self):
|
||||
init_pos_all = self.init_qpos.copy()
|
||||
init_pos_robot = self._start_pos
|
||||
@ -100,14 +104,18 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle):
|
||||
done = success or self._steps == self.sim_steps - 1 or is_collided
|
||||
self._steps += 1
|
||||
else:
|
||||
reward = -2
|
||||
reward = -2000
|
||||
success = False
|
||||
is_collided = False
|
||||
done = True
|
||||
return ob, reward, done, dict(reward_dist=reward_dist,
|
||||
reward_ctrl=reward_ctrl,
|
||||
velocity=angular_vel,
|
||||
traj=self._q_pos, is_success=success,
|
||||
# traj=self._q_pos,
|
||||
action=a,
|
||||
q_pos=self.sim.data.qpos[0:7].ravel().copy(),
|
||||
q_vel=self.sim.data.qvel[0:7].ravel().copy(),
|
||||
is_success=success,
|
||||
is_collided=is_collided, sim_crash=crash)
|
||||
|
||||
def check_traj_in_joint_limits(self):
|
||||
@ -148,6 +156,22 @@ class ALRBallInACupEnv(alr_mujoco_env.AlrMujocoEnv, utils.EzPickle):
|
||||
des_vel_full[5] = des_vel[2]
|
||||
return des_vel_full
|
||||
|
||||
def render(self, render_mode, **render_kwargs):
|
||||
if render_mode == "plot_trajectory":
|
||||
if self._steps == 1:
|
||||
import matplotlib.pyplot as plt
|
||||
# plt.ion()
|
||||
self.fig, self.axs = plt.subplots(3, 1)
|
||||
|
||||
if self._steps <= 1750:
|
||||
for ax, cp in zip(self.axs, self.current_pos[1::2]):
|
||||
ax.scatter(self._steps, cp, s=2, marker=".")
|
||||
|
||||
# self.fig.show()
|
||||
|
||||
else:
|
||||
super().render(render_mode, **render_kwargs)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
env = ALRBallInACupEnv()
|
||||
|
@ -22,7 +22,7 @@ class BallInACupReward(alr_reward_fct.AlrReward):
|
||||
self.goal_final_id = None
|
||||
self.collision_ids = None
|
||||
self._is_collided = False
|
||||
self.collision_penalty = 1
|
||||
self.collision_penalty = 1000
|
||||
|
||||
self.ball_traj = None
|
||||
self.dists = None
|
||||
@ -37,6 +37,7 @@ class BallInACupReward(alr_reward_fct.AlrReward):
|
||||
self.dists_final = []
|
||||
self.costs = []
|
||||
self.action_costs = []
|
||||
self.angle_costs = []
|
||||
self.cup_angles = []
|
||||
|
||||
def compute_reward(self, action, env):
|
||||
@ -56,8 +57,11 @@ class BallInACupReward(alr_reward_fct.AlrReward):
|
||||
self.dists_final.append(np.linalg.norm(goal_final_pos - ball_pos))
|
||||
self.ball_traj[env._steps, :] = ball_pos
|
||||
cup_quat = np.copy(env.sim.data.body_xquat[env.sim.model._body_name2id["cup"]])
|
||||
self.cup_angles.append(np.arctan2(2 * (cup_quat[0] * cup_quat[1] + cup_quat[2] * cup_quat[3]),
|
||||
1 - 2 * (cup_quat[1]**2 + cup_quat[2]**2)))
|
||||
cup_angle = np.arctan2(2 * (cup_quat[0] * cup_quat[1] + cup_quat[2] * cup_quat[3]),
|
||||
1 - 2 * (cup_quat[1]**2 + cup_quat[2]**2))
|
||||
cost_angle = (cup_angle - np.pi / 2) ** 2
|
||||
self.angle_costs.append(cost_angle)
|
||||
self.cup_angles.append(cup_angle)
|
||||
|
||||
action_cost = np.sum(np.square(action))
|
||||
self.action_costs.append(action_cost)
|
||||
@ -67,20 +71,21 @@ class BallInACupReward(alr_reward_fct.AlrReward):
|
||||
if env._steps == env.sim_steps - 1 or self._is_collided:
|
||||
t_min_dist = np.argmin(self.dists)
|
||||
angle_min_dist = self.cup_angles[t_min_dist]
|
||||
cost_angle = (angle_min_dist - np.pi / 2)**2
|
||||
# cost_angle = (angle_min_dist - np.pi / 2)**2
|
||||
|
||||
min_dist = self.dists[t_min_dist]
|
||||
|
||||
# min_dist = self.dists[t_min_dist]
|
||||
dist_final = self.dists_final[-1]
|
||||
min_dist_final = np.min(self.dists_final)
|
||||
|
||||
cost = 0.5 * dist_final + 0.05 * cost_angle # TODO: Increase cost_angle weight # 0.5 * min_dist +
|
||||
# cost = 0.5 * dist_final + 0.05 * cost_angle # TODO: Increase cost_angle weight # 0.5 * min_dist +
|
||||
# reward = np.exp(-2 * cost) - 1e-2 * action_cost - self.collision_penalty * int(self._is_collided)
|
||||
# reward = - dist_final**2 - 1e-4 * cost_angle - 1e-5 * action_cost - self.collision_penalty * int(self._is_collided)
|
||||
reward = - dist_final**2 - min_dist_final**2 - 1e-4 * cost_angle - 1e-5 * action_cost - self.collision_penalty * int(self._is_collided)
|
||||
reward = - dist_final**2 - min_dist_final**2 - 1e-4 * cost_angle - 1e-3 * action_cost - self.collision_penalty * int(self._is_collided)
|
||||
success = dist_final < 0.05 and ball_in_cup and not self._is_collided
|
||||
crash = self._is_collided
|
||||
else:
|
||||
reward = - 1e-5 * action_cost # TODO: increase action_cost weight
|
||||
reward = - 1e-3 * action_cost - 1e-4 * cost_angle # TODO: increase action_cost weight
|
||||
success = False
|
||||
crash = False
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
from alr_envs.utils.mps.dmp_wrapper import DmpWrapper
|
||||
from alr_envs.utils.mps.detpmp_wrapper import DetPMPWrapper
|
||||
from mp_env_api.mp_wrappers.dmp_wrapper import DmpWrapper
|
||||
from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper
|
||||
import gym
|
||||
from gym.vector.utils import write_to_shared_memory
|
||||
import sys
|
||||
|
@ -91,20 +91,21 @@ class AlrContextualMpEnvSampler:
|
||||
|
||||
repeat = int(np.ceil(n_samples / self.env.num_envs))
|
||||
vals = defaultdict(list)
|
||||
|
||||
obs = self.env.reset()
|
||||
for i in range(repeat):
|
||||
new_contexts = self.env.reset()
|
||||
vals['new_contexts'].append(new_contexts)
|
||||
new_samples, new_contexts = dist.sample(new_contexts)
|
||||
vals['obs'].append(obs)
|
||||
new_samples, new_contexts = dist.sample(obs)
|
||||
vals['new_samples'].append(new_samples)
|
||||
|
||||
obs, reward, done, info = self.env.step(new_samples)
|
||||
vals['obs'].append(obs)
|
||||
|
||||
vals['reward'].append(reward)
|
||||
vals['done'].append(done)
|
||||
vals['info'].append(info)
|
||||
|
||||
# do not return values above threshold
|
||||
return np.vstack(vals['new_samples'])[:n_samples], np.vstack(vals['new_contexts'])[:n_samples], \
|
||||
return np.vstack(vals['new_samples'])[:n_samples], \
|
||||
np.vstack(vals['obs'])[:n_samples], np.hstack(vals['reward'])[:n_samples], \
|
||||
_flatten_list(vals['done'])[:n_samples], _flatten_list(vals['info'])[:n_samples]
|
||||
|
||||
|
@ -1,40 +0,0 @@
|
||||
from abc import abstractmethod, ABC
|
||||
from typing import Union
|
||||
|
||||
import gym
|
||||
import numpy as np
|
||||
|
||||
|
||||
class AlrEnv(gym.Env, ABC):
|
||||
|
||||
@property
|
||||
def active_obs(self):
|
||||
"""Returns boolean mask for each observation entry
|
||||
whether the observation is returned for the contextual case or not.
|
||||
This effectively allows to filter unwanted or unnecessary observations from the full step-based case.
|
||||
"""
|
||||
return np.ones(self.observation_space.shape, dtype=bool)
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def start_pos(self) -> Union[float, int, np.ndarray]:
|
||||
"""
|
||||
Returns the starting position of the joints
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def goal_pos(self) -> Union[float, int, np.ndarray]:
|
||||
"""
|
||||
Returns the current final position of the joints for the MP.
|
||||
By default this returns the starting position.
|
||||
"""
|
||||
return self.start_pos
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def dt(self) -> Union[float, int]:
|
||||
"""
|
||||
Returns the time between two simulated steps of the environment
|
||||
"""
|
||||
raise NotImplementedError()
|
@ -1,54 +0,0 @@
|
||||
import gym
|
||||
import numpy as np
|
||||
from mp_lib import det_promp
|
||||
|
||||
from alr_envs.utils.mps.alr_env import AlrEnv
|
||||
from alr_envs.utils.mps.mp_wrapper import MPWrapper
|
||||
|
||||
|
||||
class DetPMPWrapper(MPWrapper):
|
||||
def __init__(self, env: AlrEnv, num_dof, num_basis, width, start_pos=None, duration=1, post_traj_time=0.,
|
||||
policy_type=None, weights_scale=1, zero_start=False, zero_goal=False, learn_mp_length: bool =True,
|
||||
**mp_kwargs):
|
||||
self.duration = duration # seconds
|
||||
|
||||
super().__init__(env=env, num_dof=num_dof, duration=duration, post_traj_time=post_traj_time,
|
||||
policy_type=policy_type, weights_scale=weights_scale, num_basis=num_basis,
|
||||
width=width, zero_start=zero_start, zero_goal=zero_goal,
|
||||
**mp_kwargs)
|
||||
|
||||
self.learn_mp_length = learn_mp_length
|
||||
if self.learn_mp_length:
|
||||
parameter_space_shape = (1+num_basis*num_dof,)
|
||||
else:
|
||||
parameter_space_shape = (num_basis * num_dof,)
|
||||
self.min_param = -np.inf
|
||||
self.max_param = np.inf
|
||||
self.parameterization_space = gym.spaces.Box(low=self.min_param, high=self.max_param,
|
||||
shape=parameter_space_shape, dtype=np.float32)
|
||||
|
||||
self.start_pos = start_pos
|
||||
|
||||
def initialize_mp(self, num_dof: int, duration: int, num_basis: int = 5, width: float = None,
|
||||
zero_start: bool = False, zero_goal: bool = False, **kwargs):
|
||||
pmp = det_promp.DeterministicProMP(n_basis=num_basis, n_dof=num_dof, width=width, off=0.01,
|
||||
zero_start=zero_start, zero_goal=zero_goal)
|
||||
|
||||
weights = np.zeros(shape=(num_basis, num_dof))
|
||||
pmp.set_weights(duration, weights)
|
||||
|
||||
return pmp
|
||||
|
||||
def mp_rollout(self, action):
|
||||
if self.learn_mp_length:
|
||||
duration = max(1, self.duration*abs(action[0]))
|
||||
params = np.reshape(action[1:], (self.mp.n_basis, -1)) * self.weights_scale # TODO: Fix Bug when zero_start is true
|
||||
else:
|
||||
duration = self.duration
|
||||
params = np.reshape(action, (self.mp.n_basis, -1)) * self.weights_scale # TODO: Fix Bug when zero_start is true
|
||||
self.mp.set_weights(1., params)
|
||||
_, des_pos, des_vel, _ = self.mp.compute_trajectory(frequency=max(1, duration))
|
||||
if self.mp.zero_start:
|
||||
des_pos += self.start_pos
|
||||
|
||||
return des_pos, des_vel
|
@ -1,76 +0,0 @@
|
||||
import gym
|
||||
import numpy as np
|
||||
from mp_lib import dmps
|
||||
from mp_lib.basis import DMPBasisGenerator
|
||||
from mp_lib.phase import ExpDecayPhaseGenerator
|
||||
|
||||
from alr_envs.utils.mps.alr_env import AlrEnv
|
||||
from alr_envs.utils.mps.mp_wrapper import MPWrapper
|
||||
|
||||
|
||||
class DmpWrapper(MPWrapper):
|
||||
|
||||
def __init__(self, env: AlrEnv, num_dof: int, num_basis: int,
|
||||
duration: int = 1, alpha_phase: float = 2., dt: float = None,
|
||||
learn_goal: bool = False, post_traj_time: float = 0.,
|
||||
weights_scale: float = 1., goal_scale: float = 1., bandwidth_factor: float = 3.,
|
||||
policy_type: str = None, render_mode: str = None):
|
||||
|
||||
"""
|
||||
This Wrapper generates a trajectory based on a DMP and will only return episodic performances.
|
||||
Args:
|
||||
env:
|
||||
num_dof:
|
||||
num_basis:
|
||||
duration:
|
||||
alpha_phase:
|
||||
dt:
|
||||
learn_goal:
|
||||
post_traj_time:
|
||||
policy_type:
|
||||
weights_scale:
|
||||
goal_scale:
|
||||
"""
|
||||
self.learn_goal = learn_goal
|
||||
|
||||
self.t = np.linspace(0, duration, int(duration / dt))
|
||||
self.goal_scale = goal_scale
|
||||
|
||||
super().__init__(env=env, num_dof=num_dof, duration=duration, post_traj_time=post_traj_time,
|
||||
policy_type=policy_type, weights_scale=weights_scale, render_mode=render_mode,
|
||||
num_basis=num_basis, alpha_phase=alpha_phase, bandwidth_factor=bandwidth_factor)
|
||||
|
||||
action_bounds = np.inf * np.ones((np.prod(self.mp.weights.shape) + (num_dof if learn_goal else 0)))
|
||||
self.action_space = gym.spaces.Box(low=-action_bounds, high=action_bounds, dtype=np.float32)
|
||||
|
||||
def initialize_mp(self, num_dof: int, duration: int, num_basis: int, alpha_phase: float = 2.,
|
||||
bandwidth_factor: int = 3, **kwargs):
|
||||
|
||||
phase_generator = ExpDecayPhaseGenerator(alpha_phase=alpha_phase, duration=duration)
|
||||
basis_generator = DMPBasisGenerator(phase_generator, duration=duration, num_basis=num_basis,
|
||||
basis_bandwidth_factor=bandwidth_factor)
|
||||
|
||||
dmp = dmps.DMP(num_dof=num_dof, basis_generator=basis_generator, phase_generator=phase_generator,
|
||||
dt=self.dt)
|
||||
|
||||
return dmp
|
||||
|
||||
def goal_and_weights(self, params):
|
||||
assert params.shape[-1] == self.action_space.shape[0]
|
||||
params = np.atleast_2d(params)
|
||||
|
||||
if self.learn_goal:
|
||||
goal_pos = params[0, -self.mp.num_dimensions:] # [num_dof]
|
||||
params = params[:, :-self.mp.num_dimensions] # [1,num_dof]
|
||||
else:
|
||||
goal_pos = self.env.goal_pos
|
||||
assert goal_pos is not None
|
||||
|
||||
weight_matrix = np.reshape(params, self.mp.weights.shape) # [num_basis, num_dof]
|
||||
return goal_pos * self.goal_scale, weight_matrix * self.weights_scale
|
||||
|
||||
def mp_rollout(self, action):
|
||||
self.mp.dmp_start_pos = self.env.start_pos
|
||||
goal_pos, weight_matrix = self.goal_and_weights(action)
|
||||
self.mp.set_weights(weight_matrix, goal_pos)
|
||||
return self.mp.reference_trajectory(self.t)
|
@ -1,134 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Union
|
||||
|
||||
import gym
|
||||
import numpy as np
|
||||
|
||||
from alr_envs.utils.mps.alr_env import AlrEnv
|
||||
from alr_envs.utils.policies import get_policy_class, BaseController
|
||||
|
||||
|
||||
class MPWrapper(gym.Wrapper, ABC):
|
||||
"""
|
||||
Base class for movement primitive based gym.Wrapper implementations.
|
||||
|
||||
:param env: The (wrapped) environment this wrapper is applied on
|
||||
:param num_dof: Dimension of the action space of the wrapped env
|
||||
:param duration: Number of timesteps in the trajectory of the movement primitive
|
||||
:param post_traj_time: Time for which the last position of the trajectory is fed to the environment to continue
|
||||
simulation
|
||||
:param policy_type: Type or object defining the policy that is used to generate action based on the trajectory
|
||||
:param weight_scale: Scaling parameter for the actions given to this wrapper
|
||||
:param render_mode: Equivalent to gym render mode
|
||||
"""
|
||||
def __init__(self,
|
||||
env: AlrEnv,
|
||||
num_dof: int,
|
||||
duration: int = 1,
|
||||
post_traj_time: float = 0.,
|
||||
policy_type: Union[str, BaseController] = None,
|
||||
weights_scale: float = 1.,
|
||||
render_mode: str = None,
|
||||
**mp_kwargs
|
||||
):
|
||||
super().__init__(env)
|
||||
|
||||
# adjust observation space to reduce version
|
||||
obs_sp = self.env.observation_space
|
||||
self.observation_space = gym.spaces.Box(low=obs_sp.low[self.env.active_obs],
|
||||
high=obs_sp.high[self.env.active_obs],
|
||||
dtype=obs_sp.dtype)
|
||||
|
||||
self.post_traj_steps = int(post_traj_time / env.dt)
|
||||
|
||||
self.mp = self.initialize_mp(num_dof=num_dof, duration=duration, **mp_kwargs)
|
||||
self.weights_scale = weights_scale
|
||||
|
||||
if type(policy_type) is str:
|
||||
self.policy = get_policy_class(policy_type, env, mp_kwargs)
|
||||
else:
|
||||
self.policy = policy_type
|
||||
|
||||
# rendering
|
||||
self.render_mode = render_mode
|
||||
self.render_kwargs = {}
|
||||
|
||||
# TODO: @Max I think this should not be in this class, this functionality should be part of your sampler.
|
||||
def __call__(self, params, contexts=None):
|
||||
"""
|
||||
Can be used to provide a batch of parameter sets
|
||||
"""
|
||||
params = np.atleast_2d(params)
|
||||
obs = []
|
||||
rewards = []
|
||||
dones = []
|
||||
infos = []
|
||||
# for p, c in zip(params, contexts):
|
||||
for p in params:
|
||||
# self.configure(c)
|
||||
ob, reward, done, info = self.step(p)
|
||||
obs.append(ob)
|
||||
rewards.append(reward)
|
||||
dones.append(done)
|
||||
infos.append(info)
|
||||
|
||||
return obs, np.array(rewards), dones, infos
|
||||
|
||||
def reset(self):
|
||||
return self.env.reset()[self.env.active_obs]
|
||||
|
||||
def step(self, action: np.ndarray):
|
||||
""" This function generates a trajectory based on a DMP and then does the usual loop over reset and step"""
|
||||
trajectory, velocity = self.mp_rollout(action)
|
||||
|
||||
if self.post_traj_steps > 0:
|
||||
trajectory = np.vstack([trajectory, np.tile(trajectory[-1, :], [self.post_traj_steps, 1])])
|
||||
velocity = np.vstack([velocity, np.zeros(shape=(self.post_traj_steps, self.mp.num_dimensions))])
|
||||
|
||||
trajectory_length = len(trajectory)
|
||||
actions = np.zeros(shape=(trajectory_length, self.mp.num_dimensions))
|
||||
observations= np.zeros(shape=(trajectory_length,) + self.env.observation_space.shape)
|
||||
rewards = np.zeros(shape=(trajectory_length,))
|
||||
trajectory_return = 0
|
||||
infos = dict(step_infos =[])
|
||||
for t, pos_vel in enumerate(zip(trajectory, velocity)):
|
||||
actions[t,:] = self.policy.get_action(pos_vel[0], pos_vel[1])
|
||||
observations[t,:], rewards[t], done, info = self.env.step(actions[t,:])
|
||||
trajectory_return += rewards[t]
|
||||
infos['step_infos'].append(info)
|
||||
if self.render_mode:
|
||||
self.env.render(mode=self.render_mode, **self.render_kwargs)
|
||||
if done:
|
||||
break
|
||||
|
||||
infos['step_actions'] = actions[:t+1]
|
||||
infos['step_observations'] = observations[:t+1]
|
||||
infos['step_rewards'] = rewards[:t+1]
|
||||
infos['trajectory_length'] = t+1
|
||||
done = True
|
||||
return observations[t][self.env.active_obs], trajectory_return, done, infos
|
||||
|
||||
def render(self, mode='human', **kwargs):
|
||||
"""Only set render options here, such that they can be used during the rollout.
|
||||
This only needs to be called once"""
|
||||
self.render_mode = mode
|
||||
self.render_kwargs = kwargs
|
||||
|
||||
@abstractmethod
|
||||
def mp_rollout(self, action):
|
||||
"""
|
||||
Generate trajectory and velocity based on the MP
|
||||
Returns:
|
||||
trajectory/positions, velocity
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def initialize_mp(self, num_dof: int, duration: float, **kwargs):
|
||||
"""
|
||||
Create respective instance of MP
|
||||
Returns:
|
||||
MP instance
|
||||
"""
|
||||
|
||||
raise NotImplementedError
|
@ -1,61 +0,0 @@
|
||||
from typing import Tuple, Union
|
||||
|
||||
from gym import Env
|
||||
|
||||
from alr_envs.utils.positional_env import PositionalEnv
|
||||
|
||||
|
||||
class BaseController:
|
||||
def __init__(self, env: Env, **kwargs):
|
||||
self.env = env
|
||||
|
||||
def get_action(self, des_pos, des_vel):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PosController(BaseController):
|
||||
def get_action(self, des_pos, des_vel):
|
||||
return des_pos
|
||||
|
||||
|
||||
class VelController(BaseController):
|
||||
def get_action(self, des_pos, des_vel):
|
||||
return des_vel
|
||||
|
||||
|
||||
class PDController(BaseController):
|
||||
"""
|
||||
A PD-Controller. Using position and velocity information from a provided positional environment,
|
||||
the controller calculates a response based on the desired position and velocity
|
||||
|
||||
:param env: A position environment
|
||||
:param p_gains: Factors for the proportional gains
|
||||
:param d_gains: Factors for the differential gains
|
||||
"""
|
||||
def __init__(self,
|
||||
env: PositionalEnv,
|
||||
p_gains: Union[float, Tuple],
|
||||
d_gains: Union[float, Tuple]):
|
||||
self.p_gains = p_gains
|
||||
self.d_gains = d_gains
|
||||
super(PDController, self).__init__(env, )
|
||||
|
||||
def get_action(self, des_pos, des_vel):
|
||||
cur_pos = self.env.current_pos
|
||||
cur_vel = self.env.current_vel
|
||||
assert des_pos.shape != cur_pos.shape, \
|
||||
"Mismatch in dimension between desired position {} and current position {}".format(des_pos.shape, cur_pos.shape)
|
||||
assert des_vel.shape != cur_vel.shape, \
|
||||
"Mismatch in dimension between desired velocity {} and current velocity {}".format(des_vel.shape,
|
||||
cur_vel.shape)
|
||||
trq = self.p_gains * (des_pos - cur_pos) + self.d_gains * (des_vel - cur_vel)
|
||||
return trq
|
||||
|
||||
|
||||
def get_policy_class(policy_type, env, mp_kwargs, **kwargs):
|
||||
if policy_type == "motor":
|
||||
return PDController(env, p_gains=mp_kwargs['p_gains'], d_gains=mp_kwargs['d_gains'])
|
||||
elif policy_type == "velocity":
|
||||
return VelController(env)
|
||||
elif policy_type == "position":
|
||||
return PosController(env)
|
@ -1,22 +0,0 @@
|
||||
from abc import abstractmethod
|
||||
from typing import Union, Tuple
|
||||
|
||||
import numpy as np
|
||||
from gym import Env
|
||||
|
||||
class PositionalEnv(Env):
|
||||
"""A position and velocity based environment. It functions just as any regular OpenAI Gym
|
||||
environment but it provides position, velocity and acceleration information. This usually means that the
|
||||
corresponding information from the agent is forwarded via the properties.
|
||||
PD-Controller based policies require this environment to calculate the state dependent actions for example.
|
||||
"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def current_pos(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
|
||||
raise NotImplementedError
|
Loading…
Reference in New Issue
Block a user