This commit is contained in:
Onur 2022-05-29 11:58:01 +02:00
parent 647f086a8d
commit f1a96c055b
12 changed files with 333 additions and 168 deletions

View File

@ -252,7 +252,8 @@ register(
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
kwargs={
"max_episode_steps": MAX_EPISODE_STEPS_HOPPERJUMP,
"context": False
"context": False,
"healthy_rewasrd": 1.0
}
)
register(
@ -273,6 +274,17 @@ register(
"max_episode_steps": MAX_EPISODE_STEPS_HOPPERJUMP
}
)
register(
id='ALRHopperJump-v3',
entry_point='alr_envs.alr.mujoco:ALRHopperXYJumpEnv',
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
kwargs={
"max_episode_steps": MAX_EPISODE_STEPS_HOPPERJUMP,
"context": True,
"healthy_reward": 1.0
}
)
# CtxtFree are v0, Contextual are v1
register(
id='ALRHopperJumpOnBox-v0',
@ -723,6 +735,46 @@ for _v in _versions:
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
## AntJump
_versions = ["v0", "v1"]
for _v in _versions:
_env_id = f'ALRAntJumpProMP-{_v}'
register(
id= _env_id,
entry_point='alr_envs.utils.make_env_helpers:make_mp_env_helper',
kwargs={
"name": f"alr_envs:ALRAntJump-{_v}",
"wrappers": [mujoco.ant_jump.NewMPWrapper],
"ep_wrapper_kwargs": {
"weight_scale": 1
},
"movement_primitives_kwargs": {
'movement_primitives_type': 'promp',
'action_dim': 8
},
"phase_generator_kwargs": {
'phase_generator_type': 'linear',
'delay': 0,
'tau': 10, # initial value
'learn_tau': False,
'learn_delay': False
},
"controller_kwargs": {
'controller_type': 'motor',
"p_gains": np.ones(8),
"d_gains": 0.1*np.ones(8),
},
"basis_generator_kwargs": {
'basis_generator_type': 'zero_rbf',
'num_basis': 5,
'num_basis_zero_start': 2
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append(_env_id)
## HalfCheetahJump
_versions = ["v0", "v1"]
for _v in _versions:
@ -877,6 +929,42 @@ register(
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ALRHopperJumpProMP-v2")
## HopperJump
register(
id= "ALRHopperJumpProMP-v3",
entry_point='alr_envs.utils.make_env_helpers:make_mp_env_helper',
kwargs={
"name": f"alr_envs:ALRHopperJump-v3",
"wrappers": [mujoco.hopper_jump.NewMPWrapper],
"ep_wrapper_kwargs": {
"weight_scale": 1
},
"movement_primitives_kwargs": {
'movement_primitives_type': 'promp',
'action_dim': 3
},
"phase_generator_kwargs": {
'phase_generator_type': 'linear',
'delay': 0,
'tau': 2, # initial value
'learn_tau': False,
'learn_delay': False
},
"controller_kwargs": {
'controller_type': 'motor',
"p_gains": np.ones(3),
"d_gains": 0.1*np.ones(3),
},
"basis_generator_kwargs": {
'basis_generator_type': 'zero_rbf',
'num_basis': 5,
'num_basis_zero_start': 2
}
}
)
ALL_ALR_MOTION_PRIMITIVE_ENVIRONMENTS["ProMP"].append("ALRHopperJumpProMP-v3")
## HopperJumpOnBox
_versions = ["v0", "v1"]
for _v in _versions:

View File

@ -5,7 +5,7 @@ from .table_tennis.tt_gym import TTEnvGym
from .beerpong.beerpong import ALRBeerBongEnv
from .ant_jump.ant_jump import ALRAntJumpEnv
from .half_cheetah_jump.half_cheetah_jump import ALRHalfCheetahJumpEnv
from .hopper_jump.hopper_jump import ALRHopperJumpEnv, ALRHopperJumpRndmPosEnv
from .hopper_jump.hopper_jump import ALRHopperJumpEnv, ALRHopperJumpRndmPosEnv, ALRHopperXYJumpEnv
from .hopper_jump.hopper_jump_on_box import ALRHopperJumpOnBoxEnv
from .hopper_throw.hopper_throw import ALRHopperThrowEnv
from .hopper_throw.hopper_throw_in_basket import ALRHopperThrowInBasketEnv

View File

@ -1 +1,2 @@
from .mp_wrapper import MPWrapper
from .new_mp_wrapper import NewMPWrapper

View File

@ -0,0 +1,21 @@
from alr_envs.mp.episodic_wrapper import EpisodicWrapper
from typing import Union, Tuple
import numpy as np
class NewMPWrapper(EpisodicWrapper):
def set_active_obs(self):
return np.hstack([
[False] * 111, # ant has 111 dimensional observation space !!
[True] # goal height
])
@property
def current_pos(self) -> Union[float, int, np.ndarray]:
return self.env.sim.data.qpos[7:15].copy()
@property
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[6:14].copy()

View File

@ -144,7 +144,7 @@
solref="-10000 -100"/>
<!-- <body name="cup_table" pos="0.32 -1.55 0.84" quat="0.7071068 0.7071068 0 0">-->
<body name="cup_table" pos="1.42 -1.25 0.84" quat="0.7071068 0.7071068 0 0">
<body name="cup_table" pos="-1.42 -4.05 0.84" quat="0.7071068 0.7071068 0 0">
<inertial pos="-3.75236e-10 8.27811e-05 0.0947015" quat="0.999945 -0.0104888 0 0" mass="10.132" diaginertia="0.000285643 0.000270485 9.65696e-05" />
<geom priority= "1" name="cup_geom_table3" pos="0 0.1 0.001" euler="-1.57 0 0" solref="-10000 -100" type="mesh" mesh="cup3_table" mass="10"/>
<geom priority= "1" name="cup_geom_table4" pos="0 0.1 0.001" euler="-1.57 0 0" solref="-10000 -100" type="mesh" mesh="cup4_table" mass="10"/>

View File

@ -7,8 +7,12 @@ from gym.envs.mujoco import MujocoEnv
from alr_envs.alr.mujoco.beerpong.beerpong_reward_staged import BeerPongReward
CUP_POS_MIN = np.array([-0.32, -2.2])
CUP_POS_MAX = np.array([0.32, -1.2])
CUP_POS_MIN = np.array([-1.42, -4.05])
CUP_POS_MAX = np.array([1.42, -1.25])
# CUP_POS_MIN = np.array([-0.32, -2.2])
# CUP_POS_MAX = np.array([0.32, -1.2])
# smaller context space -> Easier task
# CUP_POS_MIN = np.array([-0.16, -2.2])
@ -24,8 +28,10 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
self.cup_goal_pos = np.array(cup_goal_pos)
self._steps = 0
# self.xml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets",
# "beerpong_wo_cup" + ".xml")
self.xml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets",
"beerpong_wo_cup" + ".xml")
"beerpong_wo_cup_big_table" + ".xml")
self.j_min = np.array([-2.6, -1.985, -2.8, -0.9, -4.55, -1.5707, -2.7])
self.j_max = np.array([2.6, 1.985, 2.8, 3.14159, 1.25, 1.5707, 2.7])
@ -100,10 +106,6 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
return self._get_obs()
def step(self, a):
# if a.shape[0] == 8: # we learn also when to release the ball
# self._release_step = a[-1]
# self._release_step = np.clip(self._release_step, 50, 250)
# self.release_step = 0.5/self.dt
reward_dist = 0.0
angular_vel = 0.0
applied_action = a
@ -170,16 +172,11 @@ class ALRBeerBongEnv(MujocoEnv, utils.EzPickle):
[self._steps],
])
# TODO
@property
def active_obs(self):
return np.hstack([
[False] * 7, # cos
[False] * 7, # sin
[True] * 2, # xy position of cup
[False] # env steps
])
class ALRBeerPongStepEnv(ALRBeerBongEnv):
def __init__(self, frame_skip=1, apply_gravity_comp=True, noisy=False,
rndm_goal=False, cup_goal_pos=None):
super(ALRBeerPongStepEnv, self).__init__(frame_skip, apply_gravity_comp, noisy, rndm_goal, cup_goal_pos)
if __name__ == "__main__":
env = ALRBeerBongEnv(rndm_goal=True)

View File

@ -40,6 +40,7 @@ class BeerPongReward:
self.cup_z_axes = None
self.collision_penalty = 500
self.reset(None)
self.is_initialized = False
def reset(self, noisy):
self.ball_traj = []
@ -55,10 +56,14 @@ class BeerPongReward:
self.ball_wall_contact = False
self.ball_cup_contact = False
self.ball_in_cup = False
self.dist_ground_cup = -1 # distance floor to cup if first floor contact
self.noisy_bp = noisy
self._t_min_final_dist = -1
def compute_reward(self, env, action):
if not self.is_initialized:
self.is_initialized = True
self.ball_id = env.sim.model._body_name2id["ball"]
self.ball_collision_id = env.sim.model._geom_name2id["ball_geom"]
self.goal_id = env.sim.model._site_name2id["cup_goal_table"]
@ -76,92 +81,35 @@ class BeerPongReward:
ball_pos = env.sim.data.body_xpos[self.ball_id]
ball_vel = env.sim.data.body_xvelp[self.ball_id]
goal_final_pos = env.sim.data.site_xpos[self.goal_final_id]
self._check_contacts(env.sim)
self.dists.append(np.linalg.norm(goal_pos - ball_pos))
self.dists_final.append(np.linalg.norm(goal_final_pos - ball_pos))
self.dist_ground_cup = np.linalg.norm(ball_pos-goal_pos) \
if self.ball_ground_contact_first and self.dist_ground_cup == -1 else self.dist_ground_cup
action_cost = np.sum(np.square(action))
self.action_costs.append(action_cost)
# ##################### Reward function which forces to bounce once on the table (tanh) ########################
# if not self.ball_table_contact:
# self.ball_table_contact = self._check_collision_single_objects(env.sim, self.ball_collision_id,
# self.table_collision_id)
#
# self._is_collided = self._check_collision_with_itself(env.sim, self.robot_collision_ids)
# if env._steps == env.ep_length - 1 or self._is_collided:
# min_dist = np.min(self.dists)
# final_dist = self.dists_final[-1]
#
# ball_in_cup = self._check_collision_single_objects(env.sim, self.ball_collision_id,
# self.cup_table_collision_id)
#
# # encourage bounce before falling into cup
# if not ball_in_cup:
# if not self.ball_table_contact:
# reward = 0.2 * (1 - np.tanh(0.5*min_dist)) + 0.1 * (1 - np.tanh(0.5*final_dist))
# else:
# reward = (1 - np.tanh(0.5*min_dist)) + 0.5 * (1 - np.tanh(0.5*final_dist))
# else:
# if not self.ball_table_contact:
# reward = 2 * (1 - np.tanh(0.5*final_dist)) + 1 * (1 - np.tanh(0.5*min_dist)) + 1
# else:
# reward = 2 * (1 - np.tanh(0.5*final_dist)) + 1 * (1 - np.tanh(0.5*min_dist)) + 3
#
# # reward = - 1 * cost - self.collision_penalty * int(self._is_collided)
# success = ball_in_cup
# crash = self._is_collided
# else:
# reward = - 1e-2 * action_cost
# success = False
# crash = False
# ################################################################################################################
##################### Reward function which does not force to bounce once on the table (tanh) ################
# self._check_contacts(env.sim)
# self._is_collided = self._check_collision_with_itself(env.sim, self.robot_collision_ids)
# if env._steps == env.ep_length - 1 or self._is_collided:
# min_dist = np.min(self.dists)
# final_dist = self.dists_final[-1]
#
# # encourage bounce before falling into cup
# if not self.ball_in_cup:
# if not self.ball_table_contact and not self.ball_cup_contact and not self.ball_wall_contact:
# min_dist_coeff, final_dist_coeff, rew_offset = 0.2, 0.1, 0
# # reward = 0.2 * (1 - np.tanh(0.5*min_dist)) + 0.1 * (1 - np.tanh(0.5*final_dist))
# else:
# min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, 0
# # reward = (1 - np.tanh(0.5*min_dist)) + 0.5 * (1 - np.tanh(0.5*final_dist))
# else:
# min_dist_coeff, final_dist_coeff, rew_offset = 1, 2, 3
# # reward = 2 * (1 - np.tanh(0.5*final_dist)) + 1 * (1 - np.tanh(0.5*min_dist)) + 3
#
# reward = final_dist_coeff * (1 - np.tanh(0.5 * final_dist)) + min_dist_coeff * (1 - np.tanh(0.5 * min_dist)) \
# + rew_offset
# success = self.ball_in_cup
# crash = self._is_collided
# else:
# reward = - 1e-2 * action_cost
# success = False
# crash = False
################################################################################################################
# # ##################### Reward function which does not force to bounce once on the table (quad dist) ############
self._check_contacts(env.sim)
self._is_collided = self._check_collision_with_itself(env.sim, self.robot_collision_ids)
if env._steps == env.ep_length - 1 or self._is_collided:
min_dist = np.min(self.dists)
final_dist = self.dists_final[-1]
# if self.ball_ground_contact_first:
# min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, -6
# else:
if self.ball_ground_contact_first:
min_dist_coeff, final_dist_coeff, ground_contact_dist_coeff, rew_offset = 1, 0.5, 2, -4 # relative rew offset when first bounding on ground
# min_dist_coeff, final_dist_coeff, ground_contact_dist_coeff, rew_offset = 1, 0.5, 0, -6 # absolute rew offset when first bouncing on ground
else:
if not self.ball_in_cup:
if not self.ball_table_contact and not self.ball_cup_contact and not self.ball_wall_contact:
min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, -4
min_dist_coeff, final_dist_coeff, ground_contact_dist_coeff, rew_offset = 1, 0.5, 0, -4
else:
min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, -2
min_dist_coeff, final_dist_coeff, ground_contact_dist_coeff, rew_offset = 1, 0.5, 0, -2
else:
min_dist_coeff, final_dist_coeff, rew_offset = 0, 1, 0
min_dist_coeff, final_dist_coeff, ground_contact_dist_coeff, rew_offset = 0, 1, 0 ,0
# dist_ground_cup = 1 * self.dist_ground_cup
reward = rew_offset - min_dist_coeff * min_dist ** 2 - final_dist_coeff * final_dist ** 2 - \
1e-4 * np.mean(action_cost)
1e-4 * np.mean(action_cost) - ground_contact_dist_coeff*self.dist_ground_cup ** 2
# 1e-7*np.mean(action_cost)
# release step punishment
min_time_bound = 0.1
@ -172,43 +120,13 @@ class BeerPongReward:
reward += release_time_rew
success = self.ball_in_cup
# print('release time :', release_time)
# print('dist_ground_cup :', dist_ground_cup)
else:
reward = - 1e-2 * action_cost
# reward = - 1e-4 * action_cost
# reward = 0
success = False
# ################################################################################################################
# # # ##################### Reward function which does not force to bounce once on the table (quad dist) ############
# self._check_contacts(env.sim)
# self._is_collided = self._check_collision_with_itself(env.sim, self.robot_collision_ids)
# if env._steps == env.ep_length - 1 or self._is_collided:
# min_dist = np.min(self.dists)
# final_dist = self.dists_final[-1]
#
# if not self.ball_in_cup:
# if not self.ball_table_contact and not self.ball_cup_contact and not self.ball_wall_contact:
# min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, -6
# else:
# if self.ball_ground_contact_first:
# min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, -4
# else:
# min_dist_coeff, final_dist_coeff, rew_offset = 1, 0.5, -2
# else:
# if self.ball_ground_contact_first:
# min_dist_coeff, final_dist_coeff, rew_offset = 0, 1, -1
# else:
# min_dist_coeff, final_dist_coeff, rew_offset = 0, 1, 0
# reward = rew_offset - min_dist_coeff * min_dist ** 2 - final_dist_coeff * final_dist ** 2 - \
# 1e-7 * np.mean(action_cost)
# # 1e-4*np.mean(action_cost)
# success = self.ball_in_cup
# else:
# # reward = - 1e-2 * action_cost
# # reward = - 1e-4 * action_cost
# reward = 0
# success = False
# ################################################################################################################
infos = {}
infos["success"] = success
infos["is_collided"] = self._is_collided

View File

@ -44,6 +44,12 @@ class NewMPWrapper(EpisodicWrapper):
else:
return action, None
def set_context(self, context):
xyz = np.zeros(3)
xyz[:2] = context
xyz[-1] = 0.840
self.env.env.model.body_pos[self.env.env.cup_table_id] = xyz
return self.get_observation_from_step(self.env.env._get_obs())
# def set_action_space(self):
# if self.mp.learn_tau:
# min_action_bounds, max_action_bounds = self.mp.get_param_bounds()

View File

@ -25,12 +25,16 @@
<joint axis="0 -1 0" name="leg_joint" pos="0 0 0.6" range="-150 0" type="hinge"/>
<geom friction="0.9" fromto="0 0 0.6 0 0 0.1" name="leg_geom" size="0.04" type="capsule"/>
<body name="foot" pos="0.13/2 0 0.1">
<site name="foot_site" pos="0 0 0.04" size="0.02 0.02 0.02" rgba="1 0 0 1" type="sphere"/>
<joint axis="0 -1 0" name="foot_joint" pos="0 0 0.1" range="-45 45" type="hinge"/>
<geom friction="2.0" fromto="-0.13 0 0.1 0.26 0 0.1" name="foot_geom" size="0.06" type="capsule"/>
</body>
</body>
</body>
</body>
<body name="goal_site_body" pos = "0 0 0">
<site name="goal_site" pos="0 0 0.0" size="0.02 0.02 0.02" rgba="0 1 0 1" type="sphere"/>
</body>
</worldbody>
<actuator>
<motor ctrllimited="true" ctrlrange="-1.0 1.0" gear="200.0" joint="thigh_joint"/>

View File

@ -1,5 +1,6 @@
from gym.envs.mujoco.hopper_v3 import HopperEnv
import numpy as np
import os
MAX_EPISODE_STEPS_HOPPERJUMP = 250
@ -14,13 +15,13 @@ class ALRHopperJumpEnv(HopperEnv):
"""
def __init__(self,
xml_file='hopper.xml',
xml_file='hopper_jump.xml',
forward_reward_weight=1.0,
ctrl_cost_weight=1e-3,
healthy_reward=0.0,
penalty=0.0,
context=True,
terminate_when_unhealthy=True,
terminate_when_unhealthy=False,
healthy_state_range=(-100.0, 100.0),
healthy_z_range=(0.5, float('inf')),
healthy_angle_range=(-float('inf'), float('inf')),
@ -34,6 +35,13 @@ class ALRHopperJumpEnv(HopperEnv):
self.goal = 0
self.context = context
self.exclude_current_positions_from_observation = exclude_current_positions_from_observation
self._floor_geom_id = None
self._foot_geom_id = None
self.contact_with_floor = False
self.init_floor_contact = False
self.has_left_floor = False
self.contact_dist = None
xml_file = os.path.join(os.path.dirname(__file__), "assets", xml_file)
super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
exclude_current_positions_from_observation)
@ -43,28 +51,36 @@ class ALRHopperJumpEnv(HopperEnv):
self.current_step += 1
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
site_pos_after = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy()
self.max_height = max(height_after, self.max_height)
ctrl_cost = self.control_cost(action)
costs = ctrl_cost
done = False
rewards = 0
if self.current_step >= self.max_episode_steps:
hight_goal_distance = -10*np.linalg.norm(self.max_height - self.goal) if self.context else self.max_height
healthy_reward = 0 if self.context else self.healthy_reward * self.current_step
healthy_reward = 0 if self.context else self.healthy_reward * 2 # self.current_step
height_reward = self._forward_reward_weight * hight_goal_distance # maybe move reward calculation into if structure and define two different _forward_reward_weight variables for context and episodic seperatley
rewards = height_reward + healthy_reward
else:
# penalty for wrong start direction of first two joints; not needed, could be removed
rewards = ((action[:2] > 0) * self.penalty).sum() if self.current_step < 10 else 0
# else:
# # penalty for wrong start direction of first two joints; not needed, could be removed
# rewards = ((action[:2] > 0) * self.penalty).sum() if self.current_step < 10 else 0
observation = self._get_obs()
reward = rewards - costs
# info = {
# 'height' : height_after,
# 'max_height': self.max_height,
# 'goal' : self.goal
# }
info = {
'height': height_after,
'x_pos': site_pos_after,
'max_height': self.max_height,
'goal' : self.goal
'height_rew': self.max_height,
'healthy_reward': self.healthy_reward * 2
}
return observation, reward, done, info
@ -73,7 +89,7 @@ class ALRHopperJumpEnv(HopperEnv):
return np.append(super()._get_obs(), self.goal)
def reset(self):
self.goal = np.random.uniform(1.4, 2.3, 1) # 1.3 2.3
self.goal = np.random.uniform(1.4, 2.16, 1) # 1.3 2.3
self.max_height = 0
self.current_step = 0
return super().reset()
@ -87,14 +103,124 @@ class ALRHopperJumpEnv(HopperEnv):
self.set_state(qpos, qvel)
observation = self._get_obs()
self.has_left_floor = False
self.contact_with_floor = False
self.init_floor_contact = False
self.contact_dist = None
return observation
def _contact_checker(self, id_1, id_2):
for coni in range(0, self.sim.data.ncon):
con = self.sim.data.contact[coni]
collision = con.geom1 == id_1 and con.geom2 == id_2
collision_trans = con.geom1 == id_2 and con.geom2 == id_1
if collision or collision_trans:
return True
return False
class ALRHopperXYJumpEnv(ALRHopperJumpEnv):
# The goal here is the desired x-Position of the Torso
def step(self, action):
self._floor_geom_id = self.model.geom_name2id('floor')
self._foot_geom_id = self.model.geom_name2id('foot_geom')
self.current_step += 1
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
site_pos_after = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy()
self.max_height = max(height_after, self.max_height)
# floor_contact = self._contact_checker(self._floor_geom_id, self._foot_geom_id) if not self.contact_with_floor else False
# self.init_floor_contact = floor_contact if not self.init_floor_contact else self.init_floor_contact
# self.has_left_floor = not floor_contact if self.init_floor_contact and not self.has_left_floor else self.has_left_floor
# self.contact_with_floor = floor_contact if not self.contact_with_floor and self.has_left_floor else self.contact_with_floor
floor_contact = self._contact_checker(self._floor_geom_id,
self._foot_geom_id) if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = floor_contact
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = np.linalg.norm(self.sim.data.site_xpos[self.model.site_name2id('foot_site')]
- np.array([self.goal, 0, 0], dtype=object))[0]
ctrl_cost = self.control_cost(action)
costs = ctrl_cost
done = False
goal_dist = 0
rewards = 0
if self.current_step >= self.max_episode_steps:
# healthy_reward = 0 if self.context else self.healthy_reward * self.current_step
healthy_reward = self.healthy_reward * 2 #* self.current_step
goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0], dtype=object))[0]
contact_dist = self.contact_dist if self.contact_dist is not None else 5
dist_reward = self._forward_reward_weight * (-3*goal_dist + 10*self.max_height - 2*contact_dist)
rewards = dist_reward + healthy_reward
# else:
## penalty for wrong start direction of first two joints; not needed, could be removed
# rewards = ((action[:2] > 0) * self.penalty).sum() if self.current_step < 10 else 0
observation = self._get_obs()
reward = rewards - costs
info = {
'height': height_after,
'x_pos': site_pos_after,
'max_height': self.max_height,
'goal': self.goal,
'dist_rew': goal_dist,
'height_rew': self.max_height,
'healthy_reward': self.healthy_reward * 2
}
return observation, reward, done, info
def reset_model(self):
self.init_qpos[1] = 1.5
self._floor_geom_id = self.model.geom_name2id('floor')
self._foot_geom_id = self.model.geom_name2id('foot_geom')
noise_low = -np.zeros(self.model.nq)
noise_low[3] = -0.5
noise_low[4] = -0.2
noise_low[5] = 0
noise_high = np.zeros(self.model.nq)
noise_high[3] = 0
noise_high[4] = 0
noise_high[5] = 0.785
rnd_vec = self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq)
qpos = self.init_qpos + rnd_vec
qvel = self.init_qvel
self.set_state(qpos, qvel)
observation = self._get_obs()
self.has_left_floor = False
self.contact_with_floor = False
self.init_floor_contact = False
self.contact_dist = None
return observation
def reset(self):
super().reset()
# self.goal = np.random.uniform(-1.5, 1.5, 1)
self.goal = np.random.uniform(0, 1.5, 1)
# self.goal = np.array([1.5])
self.sim.model.body_pos[self.sim.model.body_name2id('goal_site_body')] = np.array([self.goal, 0, 0], dtype=object)
return self.reset_model()
class ALRHopperJumpRndmPosEnv(ALRHopperJumpEnv):
def __init__(self, max_episode_steps=250):
self.contact_with_floor = False
self._floor_geom_id = None
self._foot_geom_id = None
super(ALRHopperJumpRndmPosEnv, self).__init__(exclude_current_positions_from_observation=False,
reset_noise_scale=5e-1,
max_episode_steps=max_episode_steps)
@ -104,17 +230,17 @@ class ALRHopperJumpRndmPosEnv(ALRHopperJumpEnv):
self._foot_geom_id = self.model.geom_name2id('foot_geom')
noise_low = -np.ones(self.model.nq)*self._reset_noise_scale
noise_low[1] = 0
noise_low[2] = -0.3
noise_low[3] = -0.1
noise_low[4] = -1.1
noise_low[5] = -0.785
noise_low[2] = 0
noise_low[3] = -0.2
noise_low[4] = -0.2
noise_low[5] = -0.1
noise_high = np.ones(self.model.nq)*self._reset_noise_scale
noise_high[1] = 0
noise_high[2] = 0.3
noise_high[2] = 0
noise_high[3] = 0
noise_high[4] = 0
noise_high[5] = 0.785
noise_high[5] = 0.1
rnd_vec = self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq)
# rnd_vec[2] *= 0.05 # the angle around the y axis shouldn't be too high as the agent then falls down quickly and
@ -162,24 +288,18 @@ class ALRHopperJumpRndmPosEnv(ALRHopperJumpEnv):
return observation, reward, done, info
def _contact_checker(self, id_1, id_2):
for coni in range(0, self.sim.data.ncon):
con = self.sim.data.contact[coni]
collision = con.geom1 == id_1 and con.geom2 == id_2
collision_trans = con.geom1 == id_2 and con.geom2 == id_1
if collision or collision_trans:
return True
return False
if __name__ == '__main__':
render_mode = "human" # "human" or "partial" or "final"
# env = ALRHopperJumpEnv()
env = ALRHopperJumpRndmPosEnv()
env = ALRHopperXYJumpEnv()
# env = ALRHopperJumpRndmPosEnv()
obs = env.reset()
for k in range(10):
obs = env.reset()
print('observation :', obs[:6])
print('observation :', obs[:])
for i in range(200):
# objective.load_result("/tmp/cma")
# test with random actions

View File

@ -12,9 +12,19 @@ class NewMPWrapper(EpisodicWrapper):
def current_vel(self) -> Union[float, int, np.ndarray, Tuple]:
return self.env.sim.data.qvel[3:6].copy()
# # random goal
# def set_active_obs(self):
# return np.hstack([
# [False] * (5 + int(not self.env.exclude_current_positions_from_observation)), # position
# [False] * 6, # velocity
# [True]
# ])
# Random x goal + random init pos
def set_active_obs(self):
return np.hstack([
[False] * (5 + int(not self.env.exclude_current_positions_from_observation)), # position
[False] * (2 + int(not self.env.exclude_current_positions_from_observation)), # position
[True] * 3, # set to true if randomize initial pos
[False] * 6, # velocity
[True]
])