use mp api

This commit is contained in:
Maximilian Huettenrauch 2021-06-23 18:23:37 +02:00
parent af8e868309
commit c4a698b1bc
8 changed files with 288 additions and 27 deletions

View File

@ -88,6 +88,16 @@ register(
id='ALRBallInACupSimple-v0', id='ALRBallInACupSimple-v0',
entry_point='alr_envs.mujoco:ALRBallInACupEnv', entry_point='alr_envs.mujoco:ALRBallInACupEnv',
max_episode_steps=4000, max_episode_steps=4000,
kwargs={
"simplified": True,
"reward_type": "no_context",
}
)
register(
id='ALRBallInACupPDSimple-v0',
entry_point='alr_envs.mujoco:ALRBallInACupPDEnv',
max_episode_steps=4000,
kwargs={ kwargs={
"simplified": True, "simplified": True,
"reward_type": "no_context" "reward_type": "no_context"
@ -368,6 +378,26 @@ register(
} }
) )
register(
id='ALRBallInACupPDSimpleDetPMP-v0',
entry_point='alr_envs.mujoco.ball_in_a_cup.biac_pd:make_detpmp_env',
kwargs={
"name": "alr_envs:ALRBallInACupPDSimple-v0",
"num_dof": 3,
"num_basis": 5,
"duration": 3.5,
"post_traj_time": 4.5,
"width": 0.0035,
# "off": -0.05,
"policy_type": "motor",
"weights_scale": 0.2,
"zero_start": True,
"zero_goal": True,
"p_gains": np.array([4./3., 2.4, 2.5, 5./3., 2., 2., 1.25]),
"d_gains": np.array([0.0466, 0.12, 0.125, 0.04166, 0.06, 0.06, 0.025])
}
)
register( register(
id='ALRBallInACupDetPMP-v0', id='ALRBallInACupDetPMP-v0',
entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env', entry_point='alr_envs.utils.make_env_helpers:make_detpmp_env',

View File

@ -7,10 +7,10 @@ from gym.utils import seeding
from matplotlib import patches from matplotlib import patches
from alr_envs.classic_control.utils import check_self_collision from alr_envs.classic_control.utils import check_self_collision
from alr_envs.utils.mps.alr_env import AlrEnv from mp_env_api.envs.mp_env import MpEnv
class HoleReacherEnv(AlrEnv): class HoleReacherEnv(MpEnv):
def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None, def __init__(self, n_links: int, hole_x: Union[None, float] = None, hole_depth: Union[None, float] = None,
hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False, hole_width: float = 1., random_start: bool = False, allow_self_collision: bool = False,
@ -44,7 +44,7 @@ class HoleReacherEnv(AlrEnv):
self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)]) self._start_pos = np.hstack([[np.pi / 2], np.zeros(self.n_links - 1)])
self._start_vel = np.zeros(self.n_links) self._start_vel = np.zeros(self.n_links)
self.dt = 0.01 self._dt = 0.01
action_bound = np.pi * np.ones((self.n_links,)) action_bound = np.pi * np.ones((self.n_links,))
state_bound = np.hstack([ state_bound = np.hstack([
@ -66,6 +66,10 @@ class HoleReacherEnv(AlrEnv):
self._steps = 0 self._steps = 0
self.seed() self.seed()
@property
def dt(self) -> Union[float, int]:
return self._dt
def step(self, action: np.ndarray): def step(self, action: np.ndarray):
""" """
A single step with an action in joint velocity space A single step with an action in joint velocity space

View File

@ -5,10 +5,10 @@ import numpy as np
from gym import spaces from gym import spaces
from gym.utils import seeding from gym.utils import seeding
from alr_envs.utils.mps.alr_env import AlrEnv from mp_env_api.envs.mp_env import MpEnv
class SimpleReacherEnv(AlrEnv): class SimpleReacherEnv(MpEnv):
""" """
Simple Reaching Task without any physics simulation. Simple Reaching Task without any physics simulation.
Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions Returns no reward until 150 time steps. This allows the agent to explore the space, but requires precise actions

View File

@ -6,10 +6,10 @@ import numpy as np
from gym.utils import seeding from gym.utils import seeding
from alr_envs.classic_control.utils import check_self_collision from alr_envs.classic_control.utils import check_self_collision
from alr_envs.utils.mps.alr_env import AlrEnv from mp_env_api.envs.mp_env import MpEnv
class ViaPointReacher(AlrEnv): class ViaPointReacher(MpEnv):
def __init__(self, n_links, random_start: bool = True, via_target: Union[None, Iterable] = None, def __init__(self, n_links, random_start: bool = True, via_target: Union[None, Iterable] = None,
target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000): target: Union[None, Iterable] = None, allow_self_collision=False, collision_penalty=1000):

View File

@ -1,3 +1,4 @@
from alr_envs.mujoco.reacher.alr_reacher import ALRReacherEnv from alr_envs.mujoco.reacher.alr_reacher import ALRReacherEnv
from alr_envs.mujoco.balancing import BalancingEnv from alr_envs.mujoco.balancing import BalancingEnv
from alr_envs.mujoco.ball_in_a_cup.ball_in_a_cup import ALRBallInACupEnv # from alr_envs.mujoco.ball_in_a_cup.ball_in_a_cup import ALRBallInACupEnv
from alr_envs.mujoco.ball_in_a_cup.biac_pd import ALRBallInACupPDEnv

View File

@ -3,9 +3,8 @@ from alr_envs.mujoco import alr_reward_fct
class BallInACupReward(alr_reward_fct.AlrReward): class BallInACupReward(alr_reward_fct.AlrReward):
def __init__(self, sim_time): def __init__(self, env):
self.sim_time = sim_time self.env = env
self.collision_objects = ["cup_geom1", "cup_geom2", "cup_base_contact_below", self.collision_objects = ["cup_geom1", "cup_geom2", "cup_base_contact_below",
"wrist_palm_link_convex_geom", "wrist_palm_link_convex_geom",
"wrist_pitch_link_convex_decomposition_p1_geom", "wrist_pitch_link_convex_decomposition_p1_geom",
@ -32,7 +31,8 @@ class BallInACupReward(alr_reward_fct.AlrReward):
self.reset(None) self.reset(None)
def reset(self, context): def reset(self, context):
self.ball_traj = np.zeros(shape=(self.sim_time, 3)) # self.sim_time = self.env.sim.dtsim_time
self.ball_traj = [] # np.zeros(shape=(self.sim_time, 3))
self.dists = [] self.dists = []
self.dists_final = [] self.dists_final = []
self.costs = [] self.costs = []
@ -40,23 +40,24 @@ class BallInACupReward(alr_reward_fct.AlrReward):
self.angle_costs = [] self.angle_costs = []
self.cup_angles = [] self.cup_angles = []
def compute_reward(self, action, env): def compute_reward(self, action):
self.ball_id = env.sim.model._body_name2id["ball"] self.ball_id = self.env.sim.model._body_name2id["ball"]
self.ball_collision_id = env.sim.model._geom_name2id["ball_geom"] self.ball_collision_id = self.env.sim.model._geom_name2id["ball_geom"]
self.goal_id = env.sim.model._site_name2id["cup_goal"] self.goal_id = self.env.sim.model._site_name2id["cup_goal"]
self.goal_final_id = env.sim.model._site_name2id["cup_goal_final"] self.goal_final_id = self.env.sim.model._site_name2id["cup_goal_final"]
self.collision_ids = [env.sim.model._geom_name2id[name] for name in self.collision_objects] self.collision_ids = [self.env.sim.model._geom_name2id[name] for name in self.collision_objects]
ball_in_cup = self.check_ball_in_cup(env.sim, self.ball_collision_id) ball_in_cup = self.check_ball_in_cup(self.env.sim, self.ball_collision_id)
# Compute the current distance from the ball to the inner part of the cup # Compute the current distance from the ball to the inner part of the cup
goal_pos = env.sim.data.site_xpos[self.goal_id] goal_pos = self.env.sim.data.site_xpos[self.goal_id]
ball_pos = env.sim.data.body_xpos[self.ball_id] ball_pos = self.env.sim.data.body_xpos[self.ball_id]
goal_final_pos = env.sim.data.site_xpos[self.goal_final_id] goal_final_pos = self.env.sim.data.site_xpos[self.goal_final_id]
self.dists.append(np.linalg.norm(goal_pos - ball_pos)) self.dists.append(np.linalg.norm(goal_pos - ball_pos))
self.dists_final.append(np.linalg.norm(goal_final_pos - ball_pos)) self.dists_final.append(np.linalg.norm(goal_final_pos - ball_pos))
self.ball_traj[env._steps, :] = ball_pos # self.ball_traj[self.env._steps, :] = ball_pos
cup_quat = np.copy(env.sim.data.body_xquat[env.sim.model._body_name2id["cup"]]) self.ball_traj.append(ball_pos)
cup_quat = np.copy(self.env.sim.data.body_xquat[self.env.sim.model._body_name2id["cup"]])
cup_angle = np.arctan2(2 * (cup_quat[0] * cup_quat[1] + cup_quat[2] * cup_quat[3]), cup_angle = np.arctan2(2 * (cup_quat[0] * cup_quat[1] + cup_quat[2] * cup_quat[3]),
1 - 2 * (cup_quat[1]**2 + cup_quat[2]**2)) 1 - 2 * (cup_quat[1]**2 + cup_quat[2]**2))
cost_angle = (cup_angle - np.pi / 2) ** 2 cost_angle = (cup_angle - np.pi / 2) ** 2
@ -66,9 +67,9 @@ class BallInACupReward(alr_reward_fct.AlrReward):
action_cost = np.sum(np.square(action)) action_cost = np.sum(np.square(action))
self.action_costs.append(action_cost) self.action_costs.append(action_cost)
self._is_collided = self.check_collision(env.sim) or env.check_traj_in_joint_limits() self._is_collided = self.check_collision(self.env.sim) or self.env.check_traj_in_joint_limits()
if env._steps == env.sim_steps - 1 or self._is_collided: if self.env._steps == self.env.ep_length - 1 or self._is_collided:
t_min_dist = np.argmin(self.dists) t_min_dist = np.argmin(self.dists)
angle_min_dist = self.cup_angles[t_min_dist] angle_min_dist = self.cup_angles[t_min_dist]
# cost_angle = (angle_min_dist - np.pi / 2)**2 # cost_angle = (angle_min_dist - np.pi / 2)**2

View File

@ -0,0 +1,225 @@
import gym.envs.mujoco
import mujoco_py.builder
from gym import utils, spaces
import os
import numpy as np
import gym.envs.mujoco as mujoco_env
from mp_env_api.envs.mp_env import MpEnv
from mp_env_api.envs.positional_env import PositionalEnv
from mp_env_api.mp_wrappers.detpmp_wrapper import DetPMPWrapper
from mp_env_api.utils.policies import PDControllerExtend
def make_detpmp_env(**kwargs):
name = kwargs.pop("name")
_env = gym.make(name)
policy = PDControllerExtend(_env, p_gains=kwargs.pop('p_gains'), d_gains=kwargs.pop('d_gains'))
kwargs['policy_type'] = policy
return DetPMPWrapper(_env, **kwargs)
class ALRBallInACupPDEnv(mujoco_env.MujocoEnv, PositionalEnv, MpEnv, utils.EzPickle):
def __init__(self, frame_skip=4, apply_gravity_comp=True, simplified: bool = False,
reward_type: str = None, context: np.ndarray = None):
self._steps = 0
self.xml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets",
"biac_base" + ".xml")
self.max_ctrl = np.array([150., 125., 40., 60., 5., 5., 2.])
self.j_min = np.array([-2.6, -1.985, -2.8, -0.9, -4.55, -1.5707, -2.7])
self.j_max = np.array([2.6, 1.985, 2.8, 3.14159, 1.25, 1.5707, 2.7])
self.context = context
self.apply_gravity_comp = apply_gravity_comp
self.simplified = simplified
self._start_pos = np.array([0.0, 0.58760536, 0.0, 1.36004913, 0.0, -0.32072943, -1.57])
self._start_vel = np.zeros(7)
self.sim_time = 8 # seconds
self.ep_length = 4000 # based on 8 seconds with dt = 0.02 int(self.sim_time / self.dt)
if reward_type == "no_context":
from alr_envs.mujoco.ball_in_a_cup.ball_in_a_cup_reward_simple import BallInACupReward
reward_function = BallInACupReward
elif reward_type == "contextual_goal":
from alr_envs.mujoco.ball_in_a_cup.ball_in_a_cup_reward import BallInACupReward
reward_function = BallInACupReward
else:
raise ValueError("Unknown reward type: {}".format(reward_type))
self.reward_function = reward_function(self)
mujoco_env.MujocoEnv.__init__(self, self.xml_path, frame_skip)
utils.EzPickle.__init__(self)
@property
def start_pos(self):
if self.simplified:
return self._start_pos[1::2]
else:
return self._start_pos
@property
def start_vel(self):
if self.simplified:
return self._start_vel[1::2]
else:
return self._start_vel
@property
def current_pos(self):
return self.sim.data.qpos[0:7].copy()
@property
def current_vel(self):
return self.sim.data.qvel[0:7].copy()
# def _set_action_space(self):
# if self.simplified:
# bounds = self.model.actuator_ctrlrange.copy().astype(np.float32)[1::2]
# else:
# bounds = self.model.actuator_ctrlrange.copy().astype(np.float32)
# low, high = bounds.T
# self.action_space = spaces.Box(low=low, high=high, dtype=np.float32)
# return self.action_space
def reset(self):
self.reward_function.reset(None)
return super().reset()
def reset_model(self):
init_pos_all = self.init_qpos.copy()
init_pos_robot = self._start_pos
init_vel = np.zeros_like(init_pos_all)
self._steps = 0
self._q_pos = []
self._q_vel = []
start_pos = init_pos_all
start_pos[0:7] = init_pos_robot
self.set_state(start_pos, init_vel)
return self._get_obs()
def step(self, a):
reward_dist = 0.0
angular_vel = 0.0
reward_ctrl = - np.square(a).sum()
# if self.simplified:
# tmp = np.zeros(7)
# tmp[1::2] = a
# a = tmp
if self.apply_gravity_comp:
a = a + self.sim.data.qfrc_bias[:len(a)].copy() / self.model.actuator_gear[:, 0]
try:
self.do_simulation(a, self.frame_skip)
crash = False
except mujoco_py.builder.MujocoException:
crash = True
# joint_cons_viol = self.check_traj_in_joint_limits()
ob = self._get_obs()
if not crash:
reward, success, is_collided = self.reward_function.compute_reward(a)
done = success or is_collided # self._steps == self.sim_steps - 1
self._steps += 1
else:
reward = -2000
success = False
is_collided = False
done = True
return ob, reward, done, dict(reward_dist=reward_dist,
reward_ctrl=reward_ctrl,
velocity=angular_vel,
# traj=self._q_pos,
action=a,
q_pos=self.sim.data.qpos[0:7].ravel().copy(),
q_vel=self.sim.data.qvel[0:7].ravel().copy(),
is_success=success,
is_collided=is_collided, sim_crash=crash)
def check_traj_in_joint_limits(self):
return any(self.current_pos > self.j_max) or any(self.current_pos < self.j_min)
# TODO: extend observation space
def _get_obs(self):
theta = self.sim.data.qpos.flat[:7]
return np.concatenate([
np.cos(theta),
np.sin(theta),
# self.get_body_com("target"), # only return target to make problem harder
[self._steps],
])
# TODO
@property
def active_obs(self):
return np.hstack([
[False] * 7, # cos
[False] * 7, # sin
# [True] * 2, # x-y coordinates of target distance
[False] # env steps
])
# These functions are for the task with 3 joint actuations
def extend_des_pos(self, des_pos):
des_pos_full = self._start_pos.copy()
des_pos_full[1] = des_pos[0]
des_pos_full[3] = des_pos[1]
des_pos_full[5] = des_pos[2]
return des_pos_full
def extend_des_vel(self, des_vel):
des_vel_full = self._start_vel.copy()
des_vel_full[1] = des_vel[0]
des_vel_full[3] = des_vel[1]
des_vel_full[5] = des_vel[2]
return des_vel_full
def render(self, render_mode, **render_kwargs):
if render_mode == "plot_trajectory":
if self._steps == 1:
import matplotlib.pyplot as plt
# plt.ion()
self.fig, self.axs = plt.subplots(3, 1)
if self._steps <= 1750:
for ax, cp in zip(self.axs, self.current_pos[1::2]):
ax.scatter(self._steps, cp, s=2, marker=".")
# self.fig.show()
else:
super().render(render_mode, **render_kwargs)
if __name__ == "__main__":
env = ALRBallInACupPDEnv(reward_type="no_context", simplified=True)
# env = gym.make("alr_envs:ALRBallInACupPDSimpleDetPMP-v0")
# ctxt = np.array([-0.20869846, -0.66376693, 1.18088501])
# env.configure(ctxt)
env.reset()
env.render("human")
for i in range(16000):
# test with random actions
ac = 0.02 * env.action_space.sample()[0:7]
# ac = env.start_pos
# ac[0] += np.pi/2
obs, rew, d, info = env.step(ac)
env.render("human")
print(rew)
if d:
break
env.close()

View File

@ -110,6 +110,6 @@ if __name__ == '__main__':
# example_async("alr_envs:LongSimpleReacherDMP-v0", 4) # example_async("alr_envs:LongSimpleReacherDMP-v0", 4)
# example_async_contextual_sampler() # example_async_contextual_sampler()
# env = gym.make("alr_envs:HoleReacherDetPMP-v1") # env = gym.make("alr_envs:HoleReacherDetPMP-v1")
env_name = "alr_envs:ALRBallInACupSimpleDetPMP-v0" env_name = "alr_envs:ALRBallInACupPDSimpleDetPMP-v0"
example_async_sampler(env_name) example_async_sampler(env_name)
# example_mp(env_name) # example_mp(env_name)