Ported Markovian Version of TableTennis to master

This commit is contained in:
Dominik Moritz Roth 2024-01-28 12:29:43 +01:00
parent 6b59a354d7
commit 1372a596b5
2 changed files with 206 additions and 10 deletions

View File

@ -5,11 +5,12 @@ from gymnasium import utils, spaces
from gymnasium.envs.mujoco import MujocoEnv
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import is_init_state_valid, magnus_force
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high
from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, jnt_vel_low, jnt_vel_high
import mujoco
MAX_EPISODE_STEPS_TABLE_TENNIS = 350
MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER = 300
CONTEXT_BOUNDS_2DIMS = np.array([[-1.0, -0.65], [-0.2, 0.65]])
CONTEXT_BOUNDS_4DIMS = np.array([[-1.0, -0.65, -1.0, -0.65],
@ -18,6 +19,9 @@ CONTEXT_BOUNDS_SWICHING = np.array([[-1.0, -0.65, -1.0, 0.],
[-0.2, 0.65, -0.2, 0.65]])
DEFAULT_ROBOT_INIT_POS = np.array([0.0, 0.0, 0.0, 1.5, 0.0, 0.0, 1.5])
DEFAULT_ROBOT_INIT_VEL = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
class TableTennisEnv(MujocoEnv, utils.EzPickle):
"""
7 DoF table tennis environment
@ -34,7 +38,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
goal_switching_step: int = None,
enable_artificial_wind: bool = False, **kwargs):
enable_artificial_wind: bool = False,
random_pos_scale: float = 0.0,
random_vel_scale: float = 0.0,
):
utils.EzPickle.__init__(**locals())
self._steps = 0
@ -48,6 +55,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self._id_set = False
# initial robot state
self._random_pos_scale = random_pos_scale
self._random_vel_scale = random_vel_scale
# reward calculation
self.ball_landing_pos = None
self._goal_pos = np.zeros(2)
@ -156,7 +167,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
"num_steps": self._steps,
}
terminated, truncated = self._terminated, False
terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS
return self._get_obs(), reward, terminated, truncated, info
@ -183,8 +194,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
self.data.qpos[:7] = np.array([0., 0., 0., 1.5, 0., 0., 1.5])
self.data.qvel[:7] = np.zeros(7)
robot_init_pos, robot_init_vel = self.get_initial_robot_state()
self.data.qpos[:7] = robot_init_pos
self.data.qvel[:7] = robot_init_vel
mujoco.mj_forward(self.model, self.data)
@ -257,7 +270,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
def get_invalid_traj_step_return(self, action, pos_traj, contextual_obs, tau_bound, delay_bound):
obs = self._get_obs() if contextual_obs else np.concatenate([self._get_obs(), np.array([0])]) # 0 for invalid traj
penalty = self._get_traj_invalid_penalty(action, pos_traj, tau_bound, delay_bound)
return obs, penalty, True, False, {
return obs, penalty, False, True, {
"hit_ball": [False],
"ball_returned_success": [False],
"land_dist_error": [10.],
@ -274,6 +287,179 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle):
return False, pos_traj, vel_traj
return True, pos_traj, vel_traj
class TableTennisMarkovian(TableTennisEnv):
def _get_reward2(self, hit_now, land_now):
# Phase 1 not hit ball
if not self._hit_ball:
# Not hit ball
min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1))
return 0.005 * (1 - np.tanh(min_r_b_dist**2))
# Phase 2 hit ball now
elif self._hit_ball and hit_now:
return 2
# Phase 3 hit ball already and not land yet
elif self._hit_ball and self._ball_landing_pos is None:
min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1))
return 0.02 * (1 - np.tanh(min_b_des_b_dist**2))
# Phase 4 hit ball already and land now
elif self._hit_ball and land_now:
over_net_bonus = int(self._ball_landing_pos[0] < 0)
min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2])
return 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus
# Phase 5 hit ball already and land already
elif self._hit_ball and not land_now and self._ball_landing_pos is not None:
return 0
else:
raise NotImplementedError
def _get_reward(self, terminated):
# if not terminated:
# return 0
min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1))
if not self._hit_ball:
# Not hit ball
return 0.2 * (1 - np.tanh(min_r_b_dist**2))
elif self._ball_landing_pos is None:
# Hit ball but not landing pos
min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1))
return 2 + (1 - np.tanh(min_b_des_b_dist**2))
else:
# Hit ball and land
min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2])
over_net_bonus = int(self._ball_landing_pos[0] < 0)
return 2 + 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus
def _get_traj_invalid_penalty(self, action, pos_traj, tau_bound, delay_bound):
tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]]))
delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]]))
violate_high_bound_error = np.mean(np.maximum(pos_traj - jnt_pos_high, 0))
violate_low_bound_error = np.mean(np.maximum(jnt_pos_low - pos_traj, 0))
invalid_penalty = tau_invalid_penalty + delay_invalid_penalty + \
violate_high_bound_error + violate_low_bound_error
return -invalid_penalty
def get_invalid_traj_step_penalty(self, pos_traj):
violate_high_bound_error = (
np.maximum(pos_traj - jnt_pos_high, 0).mean())
violate_low_bound_error = (
np.maximum(jnt_pos_low - pos_traj, 0).mean())
invalid_penalty = violate_high_bound_error + violate_low_bound_error
def _update_game_state(self, action):
for _ in range(self.frame_skip):
if self._enable_artificial_wind:
self.data.qfrc_applied[-2] = self._artificial_force
try:
self.do_simulation(action, 1)
except Exception as e:
print("Simulation get unstable return with MujocoException: ", e)
unstable_simulation = True
self._terminated = True
break
# Update game state
if not self._terminated:
if not self._hit_ball:
self._hit_ball = self._contact_checker(self._ball_contact_id, self._bat_front_id) or \
self._contact_checker(self._ball_contact_id, self._bat_back_id)
if not self._hit_ball:
ball_land_on_floor_no_hit = self._contact_checker(self._ball_contact_id, self._floor_contact_id)
if ball_land_on_floor_no_hit:
self._ball_landing_pos = self.data.body("target_ball").xpos.copy()
self._terminated = True
if self._hit_ball and not self._ball_contact_after_hit:
if self._contact_checker(self._ball_contact_id, self._floor_contact_id): # first check contact with floor
self._ball_contact_after_hit = True
self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy()
self._terminated = True
elif self._contact_checker(self._ball_contact_id, self._table_contact_id): # second check contact with table
self._ball_contact_after_hit = True
self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy()
if self._ball_landing_pos[0] < 0.: # ball lands on the opponent side
self._ball_return_success = True
self._terminated = True
# update ball trajectory & racket trajectory
self._ball_traj.append(self.data.body("target_ball").xpos.copy())
self._racket_traj.append(self.data.geom("bat").xpos.copy())
def ball_racket_contact(self):
return self._contact_checker(self._ball_contact_id, self._bat_front_id) or \
self._contact_checker(self._ball_contact_id, self._bat_back_id)
def step(self, action):
if not self._id_set:
self._set_ids()
unstable_simulation = False
hit_already = self._hit_ball
if self._steps == self._goal_switching_step and self.np_random.uniform() < 0.5:
new_goal_pos = self._generate_goal_pos(random=True)
new_goal_pos[1] = -new_goal_pos[1]
self._goal_pos = new_goal_pos
self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]])
mujoco.mj_forward(self.model, self.data)
self._update_game_state(action)
self._steps += 1
obs = self._get_obs()
# Compute reward
if unstable_simulation:
reward = -25
else:
# reward = self._get_reward(self._terminated)
# hit_now = not hit_already and self._hit_ball
hit_finish = self._hit_ball and not self.ball_racket_contact()
if hit_finish:
# Clean the ball and racket traj before hit
self._ball_traj = []
self._racket_traj = []
# Simulate the rest of the traj
reward = self._get_reward2(True, False)
while self._steps < MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER:
land_already = self._ball_landing_pos is not None
self._update_game_state(np.zeros_like(action))
self._steps += 1
land_now = (not land_already
and self._ball_landing_pos is not None)
temp_reward = self._get_reward2(False, land_now)
# print(temp_reward)
reward += temp_reward
# Uncomment the line below to visualize the sim after hit
# self.render(mode="human")
else:
reward = self._get_reward2(False, False)
# Update ball landing error
land_dist_err = np.linalg.norm(self._ball_landing_pos[:-1] - self._goal_pos) \
if self._ball_landing_pos is not None else 10.
info = {
"hit_ball": self._hit_ball,
"ball_returned_success": self._ball_return_success,
"land_dist_error": land_dist_err,
"is_success": self._ball_return_success and land_dist_err < 0.2,
"num_steps": self._steps,
}
terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER
return obs, reward, terminated, truncated, info
class TableTennisWind(TableTennisEnv):
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, **kwargs):
@ -296,7 +482,16 @@ class TableTennisWind(TableTennisEnv):
])
return obs
class TableTennisGoalSwitching(TableTennisEnv):
def __init__(self, frame_skip: int = 4, goal_switching_step: int = 99, **kwargs):
super().__init__(frame_skip=frame_skip, goal_switching_step=goal_switching_step, **kwargs)
class TableTennisRandomInit(TableTennisEnv):
def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4,
random_pos_scale: float = 1.0,
random_vel_scale: float = 0.0):
super().__init__(ctxt_dim=ctxt_dim, frame_skip=frame_skip,
random_pos_scale=random_pos_scale,
random_vel_scale=random_vel_scale)

View File

@ -2,8 +2,9 @@ import numpy as np
jnt_pos_low = np.array([-2.6, -2.0, -2.8, -0.9, -4.8, -1.6, -2.2])
jnt_pos_high = np.array([2.6, 2.0, 2.8, 3.1, 1.3, 1.6, 2.2])
delay_bound = [0.05, 0.15]
tau_bound = [0.5, 1.5]
jnt_vel_low = np.ones(7) * -7
jnt_vel_high = np.ones(7) * 7
net_height = 0.1
table_height = 0.77
@ -48,4 +49,4 @@ def magnus_force(top_spin=0.0, side_spin=0.0, v_ball=np.zeros(3), v_wind=np.zero
C_l = 4.68 * 10e-4 - 2.0984 * 10e-5 * (np.linalg.norm(v_ball) - 50) # Lift force coeffient or simply 1.23
w = np.array([0.0, top_spin, side_spin]) # Angular velocity of ball
f_m = 0.5 * rho * A * C_l * np.linalg.norm(v_ball-v_wind) * np.cross(w, v_ball-v_wind)
return f_m
return f_m