Compare commits
27 Commits
Author | SHA1 | Date | |
---|---|---|---|
107413ec75 | |||
fda9b9fc0f | |||
9faaae4785 | |||
9295e21d68 | |||
642bf8761f | |||
e89fe9e07f | |||
2b0c7d8652 | |||
c9ea8cb167 | |||
3bc0a23ec2 | |||
7e57e3c87c | |||
db8221ebb2 | |||
7f7e209087 | |||
558d331036 | |||
954df61a9a | |||
e2de7ceb13 | |||
dab3692289 | |||
fbbe312bce | |||
a67637c714 | |||
9fce6fff42 | |||
1372a596b5 | |||
6b59a354d7 | |||
acc540f757 | |||
b6c4716a04 | |||
c8b79d276d | |||
b4128ff6d9 | |||
765d4d495f | |||
50664d953b |
@ -26,7 +26,7 @@ from .mujoco.walker_2d_jump.walker_2d_jump import MAX_EPISODE_STEPS_WALKERJUMP
|
||||
from .mujoco.box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, \
|
||||
BoxPushingTemporalSpatialSparse, MAX_EPISODE_STEPS_BOX_PUSHING
|
||||
from .mujoco.table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching, \
|
||||
MAX_EPISODE_STEPS_TABLE_TENNIS
|
||||
MAX_EPISODE_STEPS_TABLE_TENNIS, MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER
|
||||
from .mujoco.table_tennis.mp_wrapper import TT_MPWrapper as MPWrapper_TableTennis
|
||||
from .mujoco.table_tennis.mp_wrapper import TT_MPWrapper_Replan as MPWrapper_TableTennis_Replan
|
||||
from .mujoco.table_tennis.mp_wrapper import TTVelObs_MPWrapper as MPWrapper_TableTennis_VelObs
|
||||
@ -135,6 +135,19 @@ register(
|
||||
}
|
||||
)
|
||||
|
||||
register(
|
||||
id='fancy/HopperJumpMarkov-v0',
|
||||
entry_point='fancy_gym.envs.mujoco:HopperJumpMarkovRew',
|
||||
mp_wrapper=mujoco.hopper_jump.MPWrapper,
|
||||
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
|
||||
kwargs={
|
||||
"sparse": False,
|
||||
"healthy_reward": 1.0,
|
||||
"contact_weight": 0.0,
|
||||
"height_weight": 3.0,
|
||||
}
|
||||
)
|
||||
|
||||
# TODO: Add [MPs] later when finished (old TODO I moved here during refactor)
|
||||
register(
|
||||
id='fancy/AntJump-v0',
|
||||
@ -290,6 +303,34 @@ register(
|
||||
}
|
||||
)
|
||||
|
||||
register(
|
||||
id='fancy/TableTennisRndRobot-v0',
|
||||
entry_point='fancy_gym.envs.mujoco:TableTennisRandomInit',
|
||||
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS,
|
||||
kwargs={
|
||||
'random_pos_scale': 0.1,
|
||||
'random_vel_scale': 0.0,
|
||||
}
|
||||
)
|
||||
|
||||
register(
|
||||
id='fancy/TableTennisMarkovian-v0',
|
||||
entry_point='fancy_gym.envs.mujoco:TableTennisMarkovian',
|
||||
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER,
|
||||
kwargs={
|
||||
}
|
||||
)
|
||||
|
||||
register(
|
||||
id='fancy/TableTennisRndRobotMarkovian-v0',
|
||||
entry_point='fancy_gym.envs.mujoco:TableTennisMarkovian',
|
||||
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER,
|
||||
kwargs={
|
||||
'random_pos_scale': 0.1,
|
||||
'random_vel_scale': 0.0,
|
||||
}
|
||||
)
|
||||
|
||||
# Air Hockey environments
|
||||
for env_mode in ["7dof-hit", "7dof-defend", "3dof-hit", "3dof-defend", "7dof-hit-airhockit2023", "7dof-defend-airhockit2023"]:
|
||||
register(
|
||||
|
@ -1,14 +1,14 @@
|
||||
from .ant_jump.ant_jump import AntJumpEnv
|
||||
from .beerpong.beerpong import BeerPongEnv, BeerPongEnvStepBasedEpisodicReward
|
||||
from .half_cheetah_jump.half_cheetah_jump import HalfCheetahJumpEnv
|
||||
from .hopper_jump.hopper_jump import HopperJumpEnv
|
||||
from .hopper_jump.hopper_jump import HopperJumpEnv, HopperJumpMarkovRew
|
||||
from .hopper_jump.hopper_jump_on_box import HopperJumpOnBoxEnv
|
||||
from .hopper_throw.hopper_throw import HopperThrowEnv
|
||||
from .hopper_throw.hopper_throw_in_basket import HopperThrowInBasketEnv
|
||||
from .reacher.reacher import ReacherEnv
|
||||
from .walker_2d_jump.walker_2d_jump import Walker2dJumpEnv
|
||||
from .box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, BoxPushingTemporalSpatialSparse
|
||||
from .table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching
|
||||
from .table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching, TableTennisMarkovian, TableTennisRandomInit
|
||||
|
||||
try:
|
||||
from .air_hockey.air_hockey_env_wrapper import AirHockeyEnv
|
||||
|
@ -6,6 +6,7 @@ from gymnasium.envs.mujoco import MujocoEnv
|
||||
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import rot_to_quat, get_quaternion_error, rotation_distance
|
||||
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import q_max, q_min, q_dot_max, q_torque_max
|
||||
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import desired_rod_quat
|
||||
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import calculate_jerk_profile, calculate_mean_squared_jerk, calculate_dimensionless_jerk, calculate_maximum_jerk
|
||||
|
||||
import mujoco
|
||||
|
||||
@ -49,6 +50,7 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
|
||||
self._desired_rod_quat = desired_rod_quat
|
||||
|
||||
self._episode_energy = 0.
|
||||
self.velocity_profile = []
|
||||
|
||||
self.observation_space = spaces.Box(
|
||||
low=-np.inf, high=np.inf, shape=(28,), dtype=np.float64
|
||||
@ -67,6 +69,8 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
|
||||
|
||||
unstable_simulation = False
|
||||
|
||||
self.velocity_profile.append(self.data.qvel[:7].copy())
|
||||
|
||||
try:
|
||||
self.do_simulation(resultant_action, self.frame_skip)
|
||||
except Exception as e:
|
||||
@ -96,11 +100,15 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
|
||||
obs = self._get_obs()
|
||||
box_goal_pos_dist = 0. if not episode_end else np.linalg.norm(box_pos - target_pos)
|
||||
box_goal_quat_dist = 0. if not episode_end else rotation_distance(box_quat, target_quat)
|
||||
mean_squared_jerk, maximum_jerk, dimensionless_jerk = (0.0,0.0,0.0) if not episode_end else self.calculate_smoothness_metrics(np.array(self.velocity_profile), self.dt)
|
||||
infos = {
|
||||
'episode_end': episode_end,
|
||||
'box_goal_pos_dist': box_goal_pos_dist,
|
||||
'box_goal_rot_dist': box_goal_quat_dist,
|
||||
'episode_energy': 0. if not episode_end else self._episode_energy,
|
||||
'mean_squared_jerk': mean_squared_jerk,
|
||||
'maximum_jerk': maximum_jerk,
|
||||
'dimensionless_jerk': dimensionless_jerk,
|
||||
'is_success': True if episode_end and box_goal_pos_dist < 0.05 and box_goal_quat_dist < 0.5 else False,
|
||||
'num_steps': self._steps
|
||||
}
|
||||
@ -110,6 +118,26 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
|
||||
|
||||
return obs, reward, terminated, truncated, infos
|
||||
|
||||
def calculate_smoothness_metrics(self, velocity_profile, dt):
|
||||
"""
|
||||
Calculates the smoothness metrics for the given velocity profile.
|
||||
param velocity_profile: np.array
|
||||
The array containing the movement velocity profile.
|
||||
param dt: float
|
||||
The sampling time interval of the data.
|
||||
return mean_squared_jerk: float
|
||||
The mean squared jerk estimate of the given movement's smoothness.
|
||||
return maximum_jerk: float
|
||||
The maximum jerk estimate of the given movement's smoothness.
|
||||
return dimensionless_jerk: float
|
||||
The dimensionless jerk estimate of the given movement's smoothness.
|
||||
"""
|
||||
jerk_profile = calculate_jerk_profile(velocity_profile, dt)
|
||||
mean_squared_jerk = calculate_mean_squared_jerk(jerk_profile)
|
||||
maximum_jerk = calculate_maximum_jerk(jerk_profile)
|
||||
dimensionless_jerk = calculate_dimensionless_jerk(jerk_profile, velocity_profile, dt)
|
||||
return mean_squared_jerk, maximum_jerk, dimensionless_jerk
|
||||
|
||||
def reset_model(self):
|
||||
# rest box to initial position
|
||||
self.set_state(self.init_qpos_box_pushing, self.init_qvel_box_pushing)
|
||||
|
@ -51,3 +51,19 @@ def rot_to_quat(theta, axis):
|
||||
quant[0] = np.sin(theta / 2.)
|
||||
quant[1:] = np.cos(theta / 2.) * axis
|
||||
return quant
|
||||
|
||||
def calculate_jerk_profile(velocity_profile, dt):
|
||||
jerk = np.diff(velocity_profile, 2, 0) / pow(dt, 2)
|
||||
return jerk
|
||||
|
||||
def calculate_mean_squared_jerk(jerk_profile):
|
||||
return np.mean(pow(jerk_profile, 2))
|
||||
|
||||
def calculate_maximum_jerk(jerk_profile):
|
||||
return np.max(abs(jerk_profile))
|
||||
|
||||
def calculate_dimensionless_jerk(jerk_profile, velocity_profile, dt):
|
||||
sum_squared_jerk = np.sum(pow(jerk_profile, 2), 0)
|
||||
duration = len(velocity_profile) * dt
|
||||
peak_velocity = np.max(abs(velocity_profile), 0)
|
||||
return np.mean(sum_squared_jerk * pow(duration, 3) / pow(peak_velocity, 2))
|
@ -262,76 +262,100 @@ class HopperJumpEnv(HopperEnvCustomXML):
|
||||
return True
|
||||
return False
|
||||
|
||||
# # TODO is that needed? if so test it
|
||||
# class HopperJumpStepEnv(HopperJumpEnv):
|
||||
#
|
||||
# def __init__(self,
|
||||
# xml_file='hopper_jump.xml',
|
||||
# forward_reward_weight=1.0,
|
||||
# ctrl_cost_weight=1e-3,
|
||||
# healthy_reward=1.0,
|
||||
# height_weight=3,
|
||||
# dist_weight=3,
|
||||
# terminate_when_unhealthy=False,
|
||||
# healthy_state_range=(-100.0, 100.0),
|
||||
# healthy_z_range=(0.5, float('inf')),
|
||||
# healthy_angle_range=(-float('inf'), float('inf')),
|
||||
# reset_noise_scale=5e-3,
|
||||
# exclude_current_positions_from_observation=False
|
||||
# ):
|
||||
#
|
||||
# self._height_weight = height_weight
|
||||
# self._dist_weight = dist_weight
|
||||
# super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
|
||||
# healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
|
||||
# exclude_current_positions_from_observation)
|
||||
#
|
||||
# def step(self, action):
|
||||
# self._steps += 1
|
||||
#
|
||||
# self.do_simulation(action, self.frame_skip)
|
||||
#
|
||||
# height_after = self.get_body_com("torso")[2]
|
||||
# site_pos_after = self.data.site('foot_site').xpos.copy()
|
||||
# self.max_height = max(height_after, self.max_height)
|
||||
#
|
||||
# ctrl_cost = self.control_cost(action)
|
||||
# healthy_reward = self.healthy_reward
|
||||
# height_reward = self._height_weight * height_after
|
||||
# goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
|
||||
# goal_dist_reward = -self._dist_weight * goal_dist
|
||||
# dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
|
||||
#
|
||||
# rewards = dist_reward + healthy_reward
|
||||
# costs = ctrl_cost
|
||||
# done = False
|
||||
#
|
||||
# # This is only for logging the distance to goal when first having the contact
|
||||
# has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
|
||||
#
|
||||
# if not self.init_floor_contact:
|
||||
# self.init_floor_contact = has_floor_contact
|
||||
# if self.init_floor_contact and not self.has_left_floor:
|
||||
# self.has_left_floor = not has_floor_contact
|
||||
# if not self.contact_with_floor and self.has_left_floor:
|
||||
# self.contact_with_floor = has_floor_contact
|
||||
#
|
||||
# if self.contact_dist is None and self.contact_with_floor:
|
||||
# self.contact_dist = goal_dist
|
||||
#
|
||||
# ##############################################################
|
||||
#
|
||||
# observation = self._get_obs()
|
||||
# reward = rewards - costs
|
||||
# info = {
|
||||
# 'height': height_after,
|
||||
# 'x_pos': site_pos_after,
|
||||
# 'max_height': copy.copy(self.max_height),
|
||||
# 'goal': copy.copy(self.goal),
|
||||
# 'goal_dist': goal_dist,
|
||||
# 'height_rew': height_reward,
|
||||
# 'healthy_reward': healthy_reward,
|
||||
# 'healthy': copy.copy(self.is_healthy),
|
||||
# 'contact_dist': copy.copy(self.contact_dist) or 0
|
||||
# }
|
||||
# return observation, reward, done, info
|
||||
class HopperJumpMarkovRew(HopperJumpEnv):
|
||||
def step(self, action):
|
||||
self._steps += 1
|
||||
|
||||
self.do_simulation(action, self.frame_skip)
|
||||
|
||||
height_after = self.get_body_com("torso")[2]
|
||||
# site_pos_after = self.data.get_site_xpos('foot_site')
|
||||
site_pos_after = self.data.site('foot_site').xpos
|
||||
self.max_height = max(height_after, self.max_height)
|
||||
|
||||
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
|
||||
|
||||
if not self.init_floor_contact:
|
||||
self.init_floor_contact = has_floor_contact
|
||||
if self.init_floor_contact and not self.has_left_floor:
|
||||
self.has_left_floor = not has_floor_contact
|
||||
if not self.contact_with_floor and self.has_left_floor:
|
||||
self.contact_with_floor = has_floor_contact
|
||||
|
||||
ctrl_cost = self.control_cost(action)
|
||||
costs = ctrl_cost
|
||||
terminated = False
|
||||
truncated = False
|
||||
|
||||
goal_dist = np.linalg.norm(site_pos_after - self.goal)
|
||||
if self.contact_dist is None and self.contact_with_floor:
|
||||
self.contact_dist = goal_dist
|
||||
|
||||
rewards = 0
|
||||
if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP):
|
||||
healthy_reward = self.healthy_reward
|
||||
distance_reward = -goal_dist * self._dist_weight
|
||||
height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
|
||||
contact_reward = -(self.contact_dist or 5) * self._contact_weight
|
||||
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
|
||||
|
||||
observation = self._get_obs()
|
||||
|
||||
# While loop to simulate the process after jump to make the task Markovian
|
||||
if self.sparse and self.has_left_floor:
|
||||
while self._steps < MAX_EPISODE_STEPS_HOPPERJUMP:
|
||||
# Simulate to the end of the episode
|
||||
self._steps += 1
|
||||
|
||||
try:
|
||||
self.do_simulation(np.zeros_like(action), self.frame_skip)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
height_after = self.get_body_com("torso")[2]
|
||||
#site_pos_after = self.data.get_site_xpos('foot_site')
|
||||
site_pos_after = self.data.site('foot_site').xpos
|
||||
self.max_height = max(height_after, self.max_height)
|
||||
|
||||
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
|
||||
|
||||
if not self.init_floor_contact:
|
||||
self.init_floor_contact = has_floor_contact
|
||||
if self.init_floor_contact and not self.has_left_floor:
|
||||
self.has_left_floor = not has_floor_contact
|
||||
if not self.contact_with_floor and self.has_left_floor:
|
||||
self.contact_with_floor = has_floor_contact
|
||||
|
||||
ctrl_cost = self.control_cost(action)
|
||||
costs = ctrl_cost
|
||||
done = False
|
||||
|
||||
goal_dist = np.linalg.norm(site_pos_after - self.goal)
|
||||
if self.contact_dist is None and self.contact_with_floor:
|
||||
self.contact_dist = goal_dist
|
||||
|
||||
rewards = 0
|
||||
|
||||
# Task has reached the end, compute the sparse reward
|
||||
done = True
|
||||
healthy_reward = self.healthy_reward
|
||||
distance_reward = -goal_dist * self._dist_weight
|
||||
height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
|
||||
contact_reward = -(self.contact_dist or 5) * self._contact_weight
|
||||
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
|
||||
|
||||
reward = rewards - costs
|
||||
info = dict(
|
||||
height=height_after,
|
||||
x_pos=site_pos_after,
|
||||
max_height=self.max_height,
|
||||
goal=self.goal[:1],
|
||||
goal_dist=goal_dist,
|
||||
height_rew=self.max_height,
|
||||
healthy_reward=self.healthy_reward,
|
||||
healthy=self.is_healthy,
|
||||
contact_dist=self.contact_dist or 0,
|
||||
num_steps=self._steps,
|
||||
has_left_floor=self.has_left_floor
|
||||
)
|
||||
return observation, reward, terminated, truncated, info
|
||||
|
@ -5,11 +5,12 @@ from gymnasium import utils, spaces
|
||||
from gymnasium.envs.mujoco import MujocoEnv
|
||||
|
||||
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import is_init_state_valid, magnus_force
|
||||
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high
|
||||
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, jnt_vel_low, jnt_vel_high
|
||||
|
||||
import mujoco
|
||||
|
||||
MAX_EPISODE_STEPS_TABLE_TENNIS = 350
|
||||
MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER = 300
|
||||
|
||||
CONTEXT_BOUNDS_2DIMS = np.array([[-1.0, -0.65], [-0.2, 0.65]])
|
||||
CONTEXT_BOUNDS_4DIMS = np.array([[-1.0, -0.65, -1.0, -0.65],
|
||||
@ -18,6 +19,9 @@ CONTEXT_BOUNDS_SWICHING = np.array([[-1.0, -0.65, -1.0, 0.],
|
||||
[-0.2, 0.65, -0.2, 0.65]])
|
||||
|
||||
|
||||
DEFAULT_ROBOT_INIT_POS = np.array([0.0, 0.0, 0.0, 1.5, 0.0, 0.0, 1.5])
|
||||
DEFAULT_ROBOT_INIT_VEL = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
|
||||
class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
"""
|
||||
7 DoF table tennis environment
|
||||
@ -34,7 +38,11 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
|
||||
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
|
||||
goal_switching_step: int = None,
|
||||
enable_artificial_wind: bool = False, **kwargs):
|
||||
enable_artificial_wind: bool = False,
|
||||
random_pos_scale: float = 0.0,
|
||||
random_vel_scale: float = 0.0,
|
||||
**kwargs,
|
||||
):
|
||||
utils.EzPickle.__init__(**locals())
|
||||
self._steps = 0
|
||||
|
||||
@ -48,6 +56,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
|
||||
self._id_set = False
|
||||
|
||||
# initial robot state
|
||||
self._random_pos_scale = random_pos_scale
|
||||
self._random_vel_scale = random_vel_scale
|
||||
|
||||
# reward calculation
|
||||
self.ball_landing_pos = None
|
||||
self._goal_pos = np.zeros(2)
|
||||
@ -156,7 +168,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
"num_steps": self._steps,
|
||||
}
|
||||
|
||||
terminated, truncated = self._terminated, False
|
||||
terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS
|
||||
|
||||
return self._get_obs(), reward, terminated, truncated, info
|
||||
|
||||
@ -167,6 +179,17 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_initial_robot_state(self):
|
||||
|
||||
robot_init_pos = DEFAULT_ROBOT_INIT_POS + \
|
||||
self.np_random.uniform(-1.0, 1.0, size=7) *\
|
||||
np.array([5.2, 4.0, 5.6, 4.0, 6.1, 3.2, 4.4]) *\
|
||||
self._random_pos_scale
|
||||
|
||||
robot_init_vel = DEFAULT_ROBOT_INIT_VEL + self.np_random.uniform(-1.0, 1.0, size=7) * self._random_vel_scale
|
||||
|
||||
return np.clip(robot_init_pos, jnt_pos_low, jnt_pos_high), np.clip(robot_init_vel, jnt_vel_low, jnt_vel_high)
|
||||
|
||||
def reset_model(self):
|
||||
self._steps = 0
|
||||
self._init_ball_state = self._generate_valid_init_ball(random_pos=True, random_vel=False)
|
||||
@ -183,8 +206,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
|
||||
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
|
||||
|
||||
self.data.qpos[:7] = np.array([0., 0., 0., 1.5, 0., 0., 1.5])
|
||||
self.data.qvel[:7] = np.zeros(7)
|
||||
robot_init_pos, robot_init_vel = self.get_initial_robot_state()
|
||||
|
||||
self.data.qpos[:7] = robot_init_pos
|
||||
self.data.qvel[:7] = robot_init_vel
|
||||
|
||||
mujoco.mj_forward(self.model, self.data)
|
||||
|
||||
@ -257,7 +282,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
def get_invalid_traj_step_return(self, action, pos_traj, contextual_obs, tau_bound, delay_bound):
|
||||
obs = self._get_obs() if contextual_obs else np.concatenate([self._get_obs(), np.array([0])]) # 0 for invalid traj
|
||||
penalty = self._get_traj_invalid_penalty(action, pos_traj, tau_bound, delay_bound)
|
||||
return obs, penalty, True, False, {
|
||||
return obs, penalty, False, True, {
|
||||
"hit_ball": [False],
|
||||
"ball_returned_success": [False],
|
||||
"land_dist_error": [10.],
|
||||
@ -274,6 +299,179 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
|
||||
return False, pos_traj, vel_traj
|
||||
return True, pos_traj, vel_traj
|
||||
|
||||
class TableTennisMarkovian(TableTennisEnv):
|
||||
def _get_reward2(self, hit_now, land_now):
|
||||
|
||||
# Phase 1 not hit ball
|
||||
if not self._hit_ball:
|
||||
# Not hit ball
|
||||
min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1))
|
||||
return 0.005 * (1 - np.tanh(min_r_b_dist**2))
|
||||
|
||||
# Phase 2 hit ball now
|
||||
elif self._hit_ball and hit_now:
|
||||
return 2
|
||||
|
||||
# Phase 3 hit ball already and not land yet
|
||||
elif self._hit_ball and self._ball_landing_pos is None:
|
||||
min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1))
|
||||
return 0.02 * (1 - np.tanh(min_b_des_b_dist**2))
|
||||
|
||||
# Phase 4 hit ball already and land now
|
||||
elif self._hit_ball and land_now:
|
||||
over_net_bonus = int(self._ball_landing_pos[0] < 0)
|
||||
min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2])
|
||||
return 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus
|
||||
|
||||
# Phase 5 hit ball already and land already
|
||||
elif self._hit_ball and not land_now and self._ball_landing_pos is not None:
|
||||
return 0
|
||||
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def _get_reward(self, terminated):
|
||||
# if not terminated:
|
||||
# return 0
|
||||
|
||||
min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1))
|
||||
if not self._hit_ball:
|
||||
# Not hit ball
|
||||
return 0.2 * (1 - np.tanh(min_r_b_dist**2))
|
||||
elif self._ball_landing_pos is None:
|
||||
# Hit ball but not landing pos
|
||||
min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1))
|
||||
return 2 + (1 - np.tanh(min_b_des_b_dist**2))
|
||||
else:
|
||||
# Hit ball and land
|
||||
min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2])
|
||||
over_net_bonus = int(self._ball_landing_pos[0] < 0)
|
||||
return 2 + 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus
|
||||
|
||||
|
||||
def _get_traj_invalid_penalty(self, action, pos_traj, tau_bound, delay_bound):
|
||||
tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]]))
|
||||
delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]]))
|
||||
violate_high_bound_error = np.mean(np.maximum(pos_traj - jnt_pos_high, 0))
|
||||
violate_low_bound_error = np.mean(np.maximum(jnt_pos_low - pos_traj, 0))
|
||||
invalid_penalty = tau_invalid_penalty + delay_invalid_penalty + \
|
||||
violate_high_bound_error + violate_low_bound_error
|
||||
return -invalid_penalty
|
||||
|
||||
def get_invalid_traj_step_penalty(self, pos_traj):
|
||||
violate_high_bound_error = (
|
||||
np.maximum(pos_traj - jnt_pos_high, 0).mean())
|
||||
violate_low_bound_error = (
|
||||
np.maximum(jnt_pos_low - pos_traj, 0).mean())
|
||||
invalid_penalty = violate_high_bound_error + violate_low_bound_error
|
||||
|
||||
|
||||
def _update_game_state(self, action):
|
||||
for _ in range(self.frame_skip):
|
||||
if self._enable_artificial_wind:
|
||||
self.data.qfrc_applied[-2] = self._artificial_force
|
||||
try:
|
||||
self.do_simulation(action, 1)
|
||||
except Exception as e:
|
||||
print("Simulation get unstable return with MujocoException: ", e)
|
||||
unstable_simulation = True
|
||||
self._terminated = True
|
||||
break
|
||||
|
||||
# Update game state
|
||||
if not self._terminated:
|
||||
if not self._hit_ball:
|
||||
self._hit_ball = self._contact_checker(self._ball_contact_id, self._bat_front_id) or \
|
||||
self._contact_checker(self._ball_contact_id, self._bat_back_id)
|
||||
if not self._hit_ball:
|
||||
ball_land_on_floor_no_hit = self._contact_checker(self._ball_contact_id, self._floor_contact_id)
|
||||
if ball_land_on_floor_no_hit:
|
||||
self._ball_landing_pos = self.data.body("target_ball").xpos.copy()
|
||||
self._terminated = True
|
||||
if self._hit_ball and not self._ball_contact_after_hit:
|
||||
if self._contact_checker(self._ball_contact_id, self._floor_contact_id): # first check contact with floor
|
||||
self._ball_contact_after_hit = True
|
||||
self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy()
|
||||
self._terminated = True
|
||||
elif self._contact_checker(self._ball_contact_id, self._table_contact_id): # second check contact with table
|
||||
self._ball_contact_after_hit = True
|
||||
self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy()
|
||||
if self._ball_landing_pos[0] < 0.: # ball lands on the opponent side
|
||||
self._ball_return_success = True
|
||||
self._terminated = True
|
||||
|
||||
# update ball trajectory & racket trajectory
|
||||
self._ball_traj.append(self.data.body("target_ball").xpos.copy())
|
||||
self._racket_traj.append(self.data.geom("bat").xpos.copy())
|
||||
|
||||
def ball_racket_contact(self):
|
||||
return self._contact_checker(self._ball_contact_id, self._bat_front_id) or \
|
||||
self._contact_checker(self._ball_contact_id, self._bat_back_id)
|
||||
|
||||
def step(self, action):
|
||||
if not self._id_set:
|
||||
self._set_ids()
|
||||
|
||||
unstable_simulation = False
|
||||
hit_already = self._hit_ball
|
||||
if self._steps == self._goal_switching_step and self.np_random.uniform() < 0.5:
|
||||
new_goal_pos = self._generate_goal_pos(random=True)
|
||||
new_goal_pos[1] = -new_goal_pos[1]
|
||||
self._goal_pos = new_goal_pos
|
||||
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
|
||||
mujoco.mj_forward(self.model, self.data)
|
||||
|
||||
self._update_game_state(action)
|
||||
self._steps += 1
|
||||
|
||||
obs = self._get_obs()
|
||||
|
||||
# Compute reward
|
||||
if unstable_simulation:
|
||||
reward = -25
|
||||
else:
|
||||
# reward = self._get_reward(self._terminated)
|
||||
# hit_now = not hit_already and self._hit_ball
|
||||
hit_finish = self._hit_ball and not self.ball_racket_contact()
|
||||
|
||||
if hit_finish:
|
||||
# Clean the ball and racket traj before hit
|
||||
self._ball_traj = []
|
||||
self._racket_traj = []
|
||||
|
||||
# Simulate the rest of the traj
|
||||
reward = self._get_reward2(True, False)
|
||||
while self._steps < MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER:
|
||||
land_already = self._ball_landing_pos is not None
|
||||
self._update_game_state(np.zeros_like(action))
|
||||
self._steps += 1
|
||||
|
||||
land_now = (not land_already
|
||||
and self._ball_landing_pos is not None)
|
||||
temp_reward = self._get_reward2(False, land_now)
|
||||
# print(temp_reward)
|
||||
reward += temp_reward
|
||||
|
||||
# Uncomment the line below to visualize the sim after hit
|
||||
# self.render(mode="human")
|
||||
else:
|
||||
reward = self._get_reward2(False, False)
|
||||
|
||||
# Update ball landing error
|
||||
land_dist_err = np.linalg.norm(self._ball_landing_pos[:-1] - self._goal_pos) \
|
||||
if self._ball_landing_pos is not None else 10.
|
||||
|
||||
info = {
|
||||
"hit_ball": self._hit_ball,
|
||||
"ball_returned_success": self._ball_return_success,
|
||||
"land_dist_error": land_dist_err,
|
||||
"is_success": self._ball_return_success and land_dist_err < 0.2,
|
||||
"num_steps": self._steps,
|
||||
}
|
||||
|
||||
terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER
|
||||
|
||||
return obs, reward, terminated, truncated, info
|
||||
|
||||
class TableTennisWind(TableTennisEnv):
|
||||
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, **kwargs):
|
||||
@ -296,7 +494,17 @@ class TableTennisWind(TableTennisEnv):
|
||||
])
|
||||
return obs
|
||||
|
||||
|
||||
class TableTennisGoalSwitching(TableTennisEnv):
|
||||
def __init__(self, frame_skip: int = 4, goal_switching_step: int = 99, **kwargs):
|
||||
super().__init__(frame_skip=frame_skip, goal_switching_step=goal_switching_step, **kwargs)
|
||||
|
||||
|
||||
class TableTennisRandomInit(TableTennisEnv):
|
||||
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
|
||||
random_pos_scale: float = 1.0,
|
||||
random_vel_scale: float = 0.0,
|
||||
**kwargs):
|
||||
super().__init__(ctxt_dim=ctxt_dim, frame_skip=frame_skip,
|
||||
random_pos_scale=random_pos_scale,
|
||||
random_vel_scale=random_vel_scale,
|
||||
**kwargs)
|
@ -2,6 +2,10 @@ import numpy as np
|
||||
|
||||
jnt_pos_low = np.array([-2.6, -2.0, -2.8, -0.9, -4.8, -1.6, -2.2])
|
||||
jnt_pos_high = np.array([2.6, 2.0, 2.8, 3.1, 1.3, 1.6, 2.2])
|
||||
|
||||
jnt_vel_low = np.ones(7) * -7
|
||||
jnt_vel_high = np.ones(7) * 7
|
||||
|
||||
delay_bound = [0.05, 0.15]
|
||||
tau_bound = [0.5, 1.5]
|
||||
|
||||
@ -48,4 +52,4 @@ def magnus_force(top_spin=0.0, side_spin=0.0, v_ball=np.zeros(3), v_wind=np.zero
|
||||
C_l = 4.68 * 10e-4 - 2.0984 * 10e-5 * (np.linalg.norm(v_ball) - 50) # Lift force coeffient or simply 1.23
|
||||
w = np.array([0.0, top_spin, side_spin]) # Angular velocity of ball
|
||||
f_m = 0.5 * rho * A * C_l * np.linalg.norm(v_ball-v_wind) * np.cross(w, v_ball-v_wind)
|
||||
return f_m
|
||||
return f_m
|
@ -20,7 +20,6 @@ GYM_IDS = [spec.id for spec in gym.envs.registry.values() if
|
||||
GYM_MP_IDS = fancy_gym.ALL_DMC_MOVEMENT_PRIMITIVE_ENVIRONMENTS['all']
|
||||
SEED = 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize('env_id', GYM_IDS)
|
||||
def test_step_gym_functionality(env_id: str):
|
||||
"""Tests that step environments run without errors using random actions."""
|
Loading…
Reference in New Issue
Block a user