Compare commits

...

27 Commits

Author SHA1 Message Date
107413ec75 Merge branch 'master' into future 2024-02-10 11:19:04 +01:00
fda9b9fc0f Merge branch 'bruce_port_envs' into future 2024-02-04 17:30:40 +01:00
9faaae4785 Fix: New TT variants were registered outside of fancy NS 2024-02-04 17:30:12 +01:00
9295e21d68 Merge branch 'bruce_port_envs' into future 2024-02-02 17:00:35 +01:00
642bf8761f Fix: BP was not returning new infos (smoothness metrics) 2024-02-02 17:00:23 +01:00
e89fe9e07f Merge branch 'bruce_port_envs' into future 2024-02-02 16:51:50 +01:00
2b0c7d8652 Fix: BP was not returning new infos (smoothness metrics) 2024-02-02 16:51:18 +01:00
c9ea8cb167 BugFix: Missing new get_initial_robot_state method in TT 2024-01-28 12:56:53 +01:00
3bc0a23ec2 BugFix: DId not pass kwargs down in new TT envs 2024-01-28 12:54:49 +01:00
7e57e3c87c Merge branch 'bruce_port_envs' into future 2024-01-28 12:46:22 +01:00
db8221ebb2 Fix: Accidentally removed delay_bound & tau_bound from TT_utils 2024-01-28 12:45:50 +01:00
7f7e209087 Merge branch 'readthedocs' into future 2024-01-28 12:41:34 +01:00
558d331036 Merge branch 'publish_from_release' into future 2024-01-28 12:41:20 +01:00
954df61a9a Merge branch 'fix_pyproject_missing_deps' into future 2024-01-28 12:41:10 +01:00
e2de7ceb13 Merge branch 'better_tests' into future 2024-01-28 12:40:51 +01:00
dab3692289 Merge branch 'fix_metaworld_rendering' into future 2024-01-28 12:40:41 +01:00
fbbe312bce Merge branch 'bruce_port_envs' into future 2024-01-28 12:39:18 +01:00
a67637c714 Register new env variants (Bruce Vers) 2024-01-28 12:37:53 +01:00
9fce6fff42 Ported new HopperJump Rew to master 2024-01-28 12:32:52 +01:00
1372a596b5 Ported Markovian Version of TableTennis to master 2024-01-28 12:29:43 +01:00
6b59a354d7 Added new smoothness metric to BP 2024-01-28 12:16:15 +01:00
acc540f757 Merge branch 'pypi-package' into future 2023-10-29 16:50:43 +01:00
b6c4716a04 Merge branch 'fix_test_builtin_gym_envs' into future 2023-10-29 15:45:49 +01:00
c8b79d276d remove forgotten print statement 2023-10-29 15:41:28 +01:00
b4128ff6d9 Fix: Typo in test_black_box leads to incorrect testing 2023-10-29 15:23:29 +01:00
765d4d495f Merge branch 'pypi-package' into future 2023-10-29 14:23:31 +01:00
50664d953b Better implementation of blacklist to only test gym builtin 2023-10-29 13:55:46 +01:00
8 changed files with 405 additions and 85 deletions

View File

@ -26,7 +26,7 @@ from .mujoco.walker_2d_jump.walker_2d_jump import MAX_EPISODE_STEPS_WALKERJUMP
from .mujoco.box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, \
BoxPushingTemporalSpatialSparse, MAX_EPISODE_STEPS_BOX_PUSHING
from .mujoco.table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching, \
MAX_EPISODE_STEPS_TABLE_TENNIS
MAX_EPISODE_STEPS_TABLE_TENNIS, MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER
from .mujoco.table_tennis.mp_wrapper import TT_MPWrapper as MPWrapper_TableTennis
from .mujoco.table_tennis.mp_wrapper import TT_MPWrapper_Replan as MPWrapper_TableTennis_Replan
from .mujoco.table_tennis.mp_wrapper import TTVelObs_MPWrapper as MPWrapper_TableTennis_VelObs
@ -135,6 +135,19 @@ register(
}
)
register(
id='fancy/HopperJumpMarkov-v0',
entry_point='fancy_gym.envs.mujoco:HopperJumpMarkovRew',
mp_wrapper=mujoco.hopper_jump.MPWrapper,
max_episode_steps=MAX_EPISODE_STEPS_HOPPERJUMP,
kwargs={
"sparse": False,
"healthy_reward": 1.0,
"contact_weight": 0.0,
"height_weight": 3.0,
}
)
# TODO: Add [MPs] later when finished (old TODO I moved here during refactor)
register(
id='fancy/AntJump-v0',
@ -290,6 +303,34 @@ register(
}
)
register(
id='fancy/TableTennisRndRobot-v0',
entry_point='fancy_gym.envs.mujoco:TableTennisRandomInit',
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS,
kwargs={
'random_pos_scale': 0.1,
'random_vel_scale': 0.0,
}
)
register(
id='fancy/TableTennisMarkovian-v0',
entry_point='fancy_gym.envs.mujoco:TableTennisMarkovian',
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER,
kwargs={
}
)
register(
id='fancy/TableTennisRndRobotMarkovian-v0',
entry_point='fancy_gym.envs.mujoco:TableTennisMarkovian',
max_episode_steps=MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER,
kwargs={
'random_pos_scale': 0.1,
'random_vel_scale': 0.0,
}
)
# Air Hockey environments
for env_mode in ["7dof-hit", "7dof-defend", "3dof-hit", "3dof-defend", "7dof-hit-airhockit2023", "7dof-defend-airhockit2023"]:
register(

View File

@ -1,14 +1,14 @@
from .ant_jump.ant_jump import AntJumpEnv
from .beerpong.beerpong import BeerPongEnv, BeerPongEnvStepBasedEpisodicReward
from .half_cheetah_jump.half_cheetah_jump import HalfCheetahJumpEnv
from .hopper_jump.hopper_jump import HopperJumpEnv
from .hopper_jump.hopper_jump import HopperJumpEnv, HopperJumpMarkovRew
from .hopper_jump.hopper_jump_on_box import HopperJumpOnBoxEnv
from .hopper_throw.hopper_throw import HopperThrowEnv
from .hopper_throw.hopper_throw_in_basket import HopperThrowInBasketEnv
from .reacher.reacher import ReacherEnv
from .walker_2d_jump.walker_2d_jump import Walker2dJumpEnv
from .box_pushing.box_pushing_env import BoxPushingDense, BoxPushingTemporalSparse, BoxPushingTemporalSpatialSparse
from .table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching
from .table_tennis.table_tennis_env import TableTennisEnv, TableTennisWind, TableTennisGoalSwitching, TableTennisMarkovian, TableTennisRandomInit
try:
from .air_hockey.air_hockey_env_wrapper import AirHockeyEnv

View File

@ -6,6 +6,7 @@ from gymnasium.envs.mujoco import MujocoEnv
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import rot_to_quat, get_quaternion_error, rotation_distance
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import q_max, q_min, q_dot_max, q_torque_max
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import desired_rod_quat
from fancy_gym.envs.mujoco.box_pushing.box_pushing_utils import calculate_jerk_profile, calculate_mean_squared_jerk, calculate_dimensionless_jerk, calculate_maximum_jerk
import mujoco
@ -49,6 +50,7 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
self._desired_rod_quat = desired_rod_quat
self._episode_energy = 0.
self.velocity_profile = []
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(28,), dtype=np.float64
@ -67,6 +69,8 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
unstable_simulation = False
self.velocity_profile.append(self.data.qvel[:7].copy())
try:
self.do_simulation(resultant_action, self.frame_skip)
except Exception as e:
@ -96,11 +100,15 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
obs = self._get_obs()
box_goal_pos_dist = 0. if not episode_end else np.linalg.norm(box_pos - target_pos)
box_goal_quat_dist = 0. if not episode_end else rotation_distance(box_quat, target_quat)
mean_squared_jerk, maximum_jerk, dimensionless_jerk = (0.0,0.0,0.0) if not episode_end else self.calculate_smoothness_metrics(np.array(self.velocity_profile), self.dt)
infos = {
'episode_end': episode_end,
'box_goal_pos_dist': box_goal_pos_dist,
'box_goal_rot_dist': box_goal_quat_dist,
'episode_energy': 0. if not episode_end else self._episode_energy,
'mean_squared_jerk': mean_squared_jerk,
'maximum_jerk': maximum_jerk,
'dimensionless_jerk': dimensionless_jerk,
'is_success': True if episode_end and box_goal_pos_dist < 0.05 and box_goal_quat_dist < 0.5 else False,
'num_steps': self._steps
}
@ -110,6 +118,26 @@ class BoxPushingEnvBase(MujocoEnv, utils.EzPickle):
return obs, reward, terminated, truncated, infos
def calculate_smoothness_metrics(self, velocity_profile, dt):
"""
Calculates the smoothness metrics for the given velocity profile.
param velocity_profile: np.array
The array containing the movement velocity profile.
param dt: float
The sampling time interval of the data.
return mean_squared_jerk: float
The mean squared jerk estimate of the given movement's smoothness.
return maximum_jerk: float
The maximum jerk estimate of the given movement's smoothness.
return dimensionless_jerk: float
The dimensionless jerk estimate of the given movement's smoothness.
"""
jerk_profile = calculate_jerk_profile(velocity_profile, dt)
mean_squared_jerk = calculate_mean_squared_jerk(jerk_profile)
maximum_jerk = calculate_maximum_jerk(jerk_profile)
dimensionless_jerk = calculate_dimensionless_jerk(jerk_profile, velocity_profile, dt)
return mean_squared_jerk, maximum_jerk, dimensionless_jerk
def reset_model(self):
# rest box to initial position
self.set_state(self.init_qpos_box_pushing, self.init_qvel_box_pushing)

View File

@ -51,3 +51,19 @@ def rot_to_quat(theta, axis):
quant[0] = np.sin(theta / 2.)
quant[1:] = np.cos(theta / 2.) * axis
return quant
def calculate_jerk_profile(velocity_profile, dt):
jerk = np.diff(velocity_profile, 2, 0) / pow(dt, 2)
return jerk
def calculate_mean_squared_jerk(jerk_profile):
return np.mean(pow(jerk_profile, 2))
def calculate_maximum_jerk(jerk_profile):
return np.max(abs(jerk_profile))
def calculate_dimensionless_jerk(jerk_profile, velocity_profile, dt):
sum_squared_jerk = np.sum(pow(jerk_profile, 2), 0)
duration = len(velocity_profile) * dt
peak_velocity = np.max(abs(velocity_profile), 0)
return np.mean(sum_squared_jerk * pow(duration, 3) / pow(peak_velocity, 2))

View File

@ -262,76 +262,100 @@ class HopperJumpEnv(HopperEnvCustomXML):
return True
return False
# # TODO is that needed? if so test it
# class HopperJumpStepEnv(HopperJumpEnv):
#
# def __init__(self,
# xml_file='hopper_jump.xml',
# forward_reward_weight=1.0,
# ctrl_cost_weight=1e-3,
# healthy_reward=1.0,
# height_weight=3,
# dist_weight=3,
# terminate_when_unhealthy=False,
# healthy_state_range=(-100.0, 100.0),
# healthy_z_range=(0.5, float('inf')),
# healthy_angle_range=(-float('inf'), float('inf')),
# reset_noise_scale=5e-3,
# exclude_current_positions_from_observation=False
# ):
#
# self._height_weight = height_weight
# self._dist_weight = dist_weight
# super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy,
# healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale,
# exclude_current_positions_from_observation)
#
# def step(self, action):
# self._steps += 1
#
# self.do_simulation(action, self.frame_skip)
#
# height_after = self.get_body_com("torso")[2]
# site_pos_after = self.data.site('foot_site').xpos.copy()
# self.max_height = max(height_after, self.max_height)
#
# ctrl_cost = self.control_cost(action)
# healthy_reward = self.healthy_reward
# height_reward = self._height_weight * height_after
# goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0]))
# goal_dist_reward = -self._dist_weight * goal_dist
# dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward)
#
# rewards = dist_reward + healthy_reward
# costs = ctrl_cost
# done = False
#
# # This is only for logging the distance to goal when first having the contact
# has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
#
# if not self.init_floor_contact:
# self.init_floor_contact = has_floor_contact
# if self.init_floor_contact and not self.has_left_floor:
# self.has_left_floor = not has_floor_contact
# if not self.contact_with_floor and self.has_left_floor:
# self.contact_with_floor = has_floor_contact
#
# if self.contact_dist is None and self.contact_with_floor:
# self.contact_dist = goal_dist
#
# ##############################################################
#
# observation = self._get_obs()
# reward = rewards - costs
# info = {
# 'height': height_after,
# 'x_pos': site_pos_after,
# 'max_height': copy.copy(self.max_height),
# 'goal': copy.copy(self.goal),
# 'goal_dist': goal_dist,
# 'height_rew': height_reward,
# 'healthy_reward': healthy_reward,
# 'healthy': copy.copy(self.is_healthy),
# 'contact_dist': copy.copy(self.contact_dist) or 0
# }
# return observation, reward, done, info
class HopperJumpMarkovRew(HopperJumpEnv):
def step(self, action):
self._steps += 1
self.do_simulation(action, self.frame_skip)
height_after = self.get_body_com("torso")[2]
# site_pos_after = self.data.get_site_xpos('foot_site')
site_pos_after = self.data.site('foot_site').xpos
self.max_height = max(height_after, self.max_height)
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = has_floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not has_floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = has_floor_contact
ctrl_cost = self.control_cost(action)
costs = ctrl_cost
terminated = False
truncated = False
goal_dist = np.linalg.norm(site_pos_after - self.goal)
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = goal_dist
rewards = 0
if not self.sparse or (self.sparse and self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP):
healthy_reward = self.healthy_reward
distance_reward = -goal_dist * self._dist_weight
height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
contact_reward = -(self.contact_dist or 5) * self._contact_weight
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
observation = self._get_obs()
# While loop to simulate the process after jump to make the task Markovian
if self.sparse and self.has_left_floor:
while self._steps < MAX_EPISODE_STEPS_HOPPERJUMP:
# Simulate to the end of the episode
self._steps += 1
try:
self.do_simulation(np.zeros_like(action), self.frame_skip)
except Exception as e:
print(e)
height_after = self.get_body_com("torso")[2]
#site_pos_after = self.data.get_site_xpos('foot_site')
site_pos_after = self.data.site('foot_site').xpos
self.max_height = max(height_after, self.max_height)
has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False
if not self.init_floor_contact:
self.init_floor_contact = has_floor_contact
if self.init_floor_contact and not self.has_left_floor:
self.has_left_floor = not has_floor_contact
if not self.contact_with_floor and self.has_left_floor:
self.contact_with_floor = has_floor_contact
ctrl_cost = self.control_cost(action)
costs = ctrl_cost
done = False
goal_dist = np.linalg.norm(site_pos_after - self.goal)
if self.contact_dist is None and self.contact_with_floor:
self.contact_dist = goal_dist
rewards = 0
# Task has reached the end, compute the sparse reward
done = True
healthy_reward = self.healthy_reward
distance_reward = -goal_dist * self._dist_weight
height_reward = (self.max_height if self.sparse else height_after) * self._height_weight
contact_reward = -(self.contact_dist or 5) * self._contact_weight
rewards = self._forward_reward_weight * (distance_reward + height_reward + contact_reward + healthy_reward)
reward = rewards - costs
info = dict(
height=height_after,
x_pos=site_pos_after,
max_height=self.max_height,
goal=self.goal[:1],
goal_dist=goal_dist,
height_rew=self.max_height,
healthy_reward=self.healthy_reward,
healthy=self.is_healthy,
contact_dist=self.contact_dist or 0,
num_steps=self._steps,
has_left_floor=self.has_left_floor
)
return observation, reward, terminated, truncated, info

View File

@ -5,11 +5,12 @@ from gymnasium import utils, spaces
from gymnasium.envs.mujoco import MujocoEnv
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import is_init_state_valid, magnus_force
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, jnt_vel_low, jnt_vel_high
import mujoco
MAX_EPISODE_STEPS_TABLE_TENNIS = 350
MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER = 300
CONTEXT_BOUNDS_2DIMS = np.array([[-1.0, -0.65], [-0.2, 0.65]])
CONTEXT_BOUNDS_4DIMS = np.array([[-1.0, -0.65, -1.0, -0.65],
@ -18,6 +19,9 @@ CONTEXT_BOUNDS_SWICHING = np.array([[-1.0, -0.65, -1.0, 0.],
[-0.2, 0.65, -0.2, 0.65]])
DEFAULT_ROBOT_INIT_POS = np.array([0.0, 0.0, 0.0, 1.5, 0.0, 0.0, 1.5])
DEFAULT_ROBOT_INIT_VEL = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
class TableTennisEnv(MujocoEnv, utils.EzPickle):
"""
7 DoF table tennis environment
@ -34,7 +38,11 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
goal_switching_step: int = None,
enable_artificial_wind: bool = False, **kwargs):
enable_artificial_wind: bool = False,
random_pos_scale: float = 0.0,
random_vel_scale: float = 0.0,
**kwargs,
):
utils.EzPickle.__init__(**locals())
self._steps = 0
@ -48,6 +56,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self._id_set = False
# initial robot state
self._random_pos_scale = random_pos_scale
self._random_vel_scale = random_vel_scale
# reward calculation
self.ball_landing_pos = None
self._goal_pos = np.zeros(2)
@ -156,7 +168,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
"num_steps": self._steps,
}
terminated, truncated = self._terminated, False
terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS
return self._get_obs(), reward, terminated, truncated, info
@ -167,6 +179,17 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
return True
return False
def get_initial_robot_state(self):
robot_init_pos = DEFAULT_ROBOT_INIT_POS + \
self.np_random.uniform(-1.0, 1.0, size=7) *\
np.array([5.2, 4.0, 5.6, 4.0, 6.1, 3.2, 4.4]) *\
self._random_pos_scale
robot_init_vel = DEFAULT_ROBOT_INIT_VEL + self.np_random.uniform(-1.0, 1.0, size=7) * self._random_vel_scale
return np.clip(robot_init_pos, jnt_pos_low, jnt_pos_high), np.clip(robot_init_vel, jnt_vel_low, jnt_vel_high)
def reset_model(self):
self._steps = 0
self._init_ball_state = self._generate_valid_init_ball(random_pos=True, random_vel=False)
@ -183,8 +206,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
self.data.qpos[:7] = np.array([0., 0., 0., 1.5, 0., 0., 1.5])
self.data.qvel[:7] = np.zeros(7)
robot_init_pos, robot_init_vel = self.get_initial_robot_state()
self.data.qpos[:7] = robot_init_pos
self.data.qvel[:7] = robot_init_vel
mujoco.mj_forward(self.model, self.data)
@ -257,7 +282,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
def get_invalid_traj_step_return(self, action, pos_traj, contextual_obs, tau_bound, delay_bound):
obs = self._get_obs() if contextual_obs else np.concatenate([self._get_obs(), np.array([0])]) # 0 for invalid traj
penalty = self._get_traj_invalid_penalty(action, pos_traj, tau_bound, delay_bound)
return obs, penalty, True, False, {
return obs, penalty, False, True, {
"hit_ball": [False],
"ball_returned_success": [False],
"land_dist_error": [10.],
@ -274,6 +299,179 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
return False, pos_traj, vel_traj
return True, pos_traj, vel_traj
class TableTennisMarkovian(TableTennisEnv):
def _get_reward2(self, hit_now, land_now):
# Phase 1 not hit ball
if not self._hit_ball:
# Not hit ball
min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1))
return 0.005 * (1 - np.tanh(min_r_b_dist**2))
# Phase 2 hit ball now
elif self._hit_ball and hit_now:
return 2
# Phase 3 hit ball already and not land yet
elif self._hit_ball and self._ball_landing_pos is None:
min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1))
return 0.02 * (1 - np.tanh(min_b_des_b_dist**2))
# Phase 4 hit ball already and land now
elif self._hit_ball and land_now:
over_net_bonus = int(self._ball_landing_pos[0] < 0)
min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2])
return 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus
# Phase 5 hit ball already and land already
elif self._hit_ball and not land_now and self._ball_landing_pos is not None:
return 0
else:
raise NotImplementedError
def _get_reward(self, terminated):
# if not terminated:
# return 0
min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1))
if not self._hit_ball:
# Not hit ball
return 0.2 * (1 - np.tanh(min_r_b_dist**2))
elif self._ball_landing_pos is None:
# Hit ball but not landing pos
min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1))
return 2 + (1 - np.tanh(min_b_des_b_dist**2))
else:
# Hit ball and land
min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2])
over_net_bonus = int(self._ball_landing_pos[0] < 0)
return 2 + 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus
def _get_traj_invalid_penalty(self, action, pos_traj, tau_bound, delay_bound):
tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]]))
delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]]))
violate_high_bound_error = np.mean(np.maximum(pos_traj - jnt_pos_high, 0))
violate_low_bound_error = np.mean(np.maximum(jnt_pos_low - pos_traj, 0))
invalid_penalty = tau_invalid_penalty + delay_invalid_penalty + \
violate_high_bound_error + violate_low_bound_error
return -invalid_penalty
def get_invalid_traj_step_penalty(self, pos_traj):
violate_high_bound_error = (
np.maximum(pos_traj - jnt_pos_high, 0).mean())
violate_low_bound_error = (
np.maximum(jnt_pos_low - pos_traj, 0).mean())
invalid_penalty = violate_high_bound_error + violate_low_bound_error
def _update_game_state(self, action):
for _ in range(self.frame_skip):
if self._enable_artificial_wind:
self.data.qfrc_applied[-2] = self._artificial_force
try:
self.do_simulation(action, 1)
except Exception as e:
print("Simulation get unstable return with MujocoException: ", e)
unstable_simulation = True
self._terminated = True
break
# Update game state
if not self._terminated:
if not self._hit_ball:
self._hit_ball = self._contact_checker(self._ball_contact_id, self._bat_front_id) or \
self._contact_checker(self._ball_contact_id, self._bat_back_id)
if not self._hit_ball:
ball_land_on_floor_no_hit = self._contact_checker(self._ball_contact_id, self._floor_contact_id)
if ball_land_on_floor_no_hit:
self._ball_landing_pos = self.data.body("target_ball").xpos.copy()
self._terminated = True
if self._hit_ball and not self._ball_contact_after_hit:
if self._contact_checker(self._ball_contact_id, self._floor_contact_id): # first check contact with floor
self._ball_contact_after_hit = True
self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy()
self._terminated = True
elif self._contact_checker(self._ball_contact_id, self._table_contact_id): # second check contact with table
self._ball_contact_after_hit = True
self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy()
if self._ball_landing_pos[0] < 0.: # ball lands on the opponent side
self._ball_return_success = True
self._terminated = True
# update ball trajectory & racket trajectory
self._ball_traj.append(self.data.body("target_ball").xpos.copy())
self._racket_traj.append(self.data.geom("bat").xpos.copy())
def ball_racket_contact(self):
return self._contact_checker(self._ball_contact_id, self._bat_front_id) or \
self._contact_checker(self._ball_contact_id, self._bat_back_id)
def step(self, action):
if not self._id_set:
self._set_ids()
unstable_simulation = False
hit_already = self._hit_ball
if self._steps == self._goal_switching_step and self.np_random.uniform() < 0.5:
new_goal_pos = self._generate_goal_pos(random=True)
new_goal_pos[1] = -new_goal_pos[1]
self._goal_pos = new_goal_pos
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
mujoco.mj_forward(self.model, self.data)
self._update_game_state(action)
self._steps += 1
obs = self._get_obs()
# Compute reward
if unstable_simulation:
reward = -25
else:
# reward = self._get_reward(self._terminated)
# hit_now = not hit_already and self._hit_ball
hit_finish = self._hit_ball and not self.ball_racket_contact()
if hit_finish:
# Clean the ball and racket traj before hit
self._ball_traj = []
self._racket_traj = []
# Simulate the rest of the traj
reward = self._get_reward2(True, False)
while self._steps < MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER:
land_already = self._ball_landing_pos is not None
self._update_game_state(np.zeros_like(action))
self._steps += 1
land_now = (not land_already
and self._ball_landing_pos is not None)
temp_reward = self._get_reward2(False, land_now)
# print(temp_reward)
reward += temp_reward
# Uncomment the line below to visualize the sim after hit
# self.render(mode="human")
else:
reward = self._get_reward2(False, False)
# Update ball landing error
land_dist_err = np.linalg.norm(self._ball_landing_pos[:-1] - self._goal_pos) \
if self._ball_landing_pos is not None else 10.
info = {
"hit_ball": self._hit_ball,
"ball_returned_success": self._ball_return_success,
"land_dist_error": land_dist_err,
"is_success": self._ball_return_success and land_dist_err < 0.2,
"num_steps": self._steps,
}
terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER
return obs, reward, terminated, truncated, info
class TableTennisWind(TableTennisEnv):
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, **kwargs):
@ -296,7 +494,17 @@ class TableTennisWind(TableTennisEnv):
])
return obs
class TableTennisGoalSwitching(TableTennisEnv):
def __init__(self, frame_skip: int = 4, goal_switching_step: int = 99, **kwargs):
super().__init__(frame_skip=frame_skip, goal_switching_step=goal_switching_step, **kwargs)
class TableTennisRandomInit(TableTennisEnv):
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
random_pos_scale: float = 1.0,
random_vel_scale: float = 0.0,
**kwargs):
super().__init__(ctxt_dim=ctxt_dim, frame_skip=frame_skip,
random_pos_scale=random_pos_scale,
random_vel_scale=random_vel_scale,
**kwargs)

View File

@ -2,6 +2,10 @@ import numpy as np
jnt_pos_low = np.array([-2.6, -2.0, -2.8, -0.9, -4.8, -1.6, -2.2])
jnt_pos_high = np.array([2.6, 2.0, 2.8, 3.1, 1.3, 1.6, 2.2])
jnt_vel_low = np.ones(7) * -7
jnt_vel_high = np.ones(7) * 7
delay_bound = [0.05, 0.15]
tau_bound = [0.5, 1.5]
@ -48,4 +52,4 @@ def magnus_force(top_spin=0.0, side_spin=0.0, v_ball=np.zeros(3), v_wind=np.zero
C_l = 4.68 * 10e-4 - 2.0984 * 10e-5 * (np.linalg.norm(v_ball) - 50) # Lift force coeffient or simply 1.23
w = np.array([0.0, top_spin, side_spin]) # Angular velocity of ball
f_m = 0.5 * rho * A * C_l * np.linalg.norm(v_ball-v_wind) * np.cross(w, v_ball-v_wind)
return f_m
return f_m

View File

@ -20,7 +20,6 @@ GYM_IDS = [spec.id for spec in gym.envs.registry.values() if
GYM_MP_IDS = fancy_gym.ALL_DMC_MOVEMENT_PRIMITIVE_ENVIRONMENTS['all']
SEED = 1
@pytest.mark.parametrize('env_id', GYM_IDS)
def test_step_gym_functionality(env_id: str):
"""Tests that step environments run without errors using random actions."""