From 1372a596b523fea87294d2a39c886d1cab6c4cb1 Mon Sep 17 00:00:00 2001 From: Dominik Roth Date: Sun, 28 Jan 2024 12:29:43 +0100 Subject: [PATCH] Ported Markovian Version of TableTennis to master --- .../mujoco/table_tennis/table_tennis_env.py | 209 +++++++++++++++++- .../mujoco/table_tennis/table_tennis_utils.py | 7 +- 2 files changed, 206 insertions(+), 10 deletions(-) diff --git a/fancy_gym/envs/mujoco/table_tennis/table_tennis_env.py b/fancy_gym/envs/mujoco/table_tennis/table_tennis_env.py index 216ca1f..e085c61 100644 --- a/fancy_gym/envs/mujoco/table_tennis/table_tennis_env.py +++ b/fancy_gym/envs/mujoco/table_tennis/table_tennis_env.py @@ -5,11 +5,12 @@ from gymnasium import utils, spaces from gymnasium.envs.mujoco import MujocoEnv from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import is_init_state_valid, magnus_force -from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high +from fancy_gym.envs.mujoco.table_tennis.table_tennis_utils import jnt_pos_low, jnt_pos_high, jnt_vel_low, jnt_vel_high import mujoco MAX_EPISODE_STEPS_TABLE_TENNIS = 350 +MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER = 300 CONTEXT_BOUNDS_2DIMS = np.array([[-1.0, -0.65], [-0.2, 0.65]]) CONTEXT_BOUNDS_4DIMS = np.array([[-1.0, -0.65, -1.0, -0.65], @@ -18,6 +19,9 @@ CONTEXT_BOUNDS_SWICHING = np.array([[-1.0, -0.65, -1.0, 0.], [-0.2, 0.65, -0.2, 0.65]]) +DEFAULT_ROBOT_INIT_POS = np.array([0.0, 0.0, 0.0, 1.5, 0.0, 0.0, 1.5]) +DEFAULT_ROBOT_INIT_VEL = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + class TableTennisEnv(MujocoEnv, utils.EzPickle): """ 7 DoF table tennis environment @@ -34,7 +38,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle): def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, goal_switching_step: int = None, - enable_artificial_wind: bool = False, **kwargs): + enable_artificial_wind: bool = False, + random_pos_scale: float = 0.0, + random_vel_scale: float = 0.0, + ): utils.EzPickle.__init__(**locals()) self._steps = 0 @@ -48,6 +55,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle): self._id_set = False + # initial robot state + self._random_pos_scale = random_pos_scale + self._random_vel_scale = random_vel_scale + # reward calculation self.ball_landing_pos = None self._goal_pos = np.zeros(2) @@ -156,7 +167,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle): "num_steps": self._steps, } - terminated, truncated = self._terminated, False + terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS return self._get_obs(), reward, terminated, truncated, info @@ -183,8 +194,10 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle): self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]]) - self.data.qpos[:7] = np.array([0., 0., 0., 1.5, 0., 0., 1.5]) - self.data.qvel[:7] = np.zeros(7) + robot_init_pos, robot_init_vel = self.get_initial_robot_state() + + self.data.qpos[:7] = robot_init_pos + self.data.qvel[:7] = robot_init_vel mujoco.mj_forward(self.model, self.data) @@ -257,7 +270,7 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle): def get_invalid_traj_step_return(self, action, pos_traj, contextual_obs, tau_bound, delay_bound): obs = self._get_obs() if contextual_obs else np.concatenate([self._get_obs(), np.array([0])]) # 0 for invalid traj penalty = self._get_traj_invalid_penalty(action, pos_traj, tau_bound, delay_bound) - return obs, penalty, True, False, { + return obs, penalty, False, True, { "hit_ball": [False], "ball_returned_success": [False], "land_dist_error": [10.], @@ -274,6 +287,179 @@ class TableTennisEnv(MujocoEnv, utils.EzPickle): return False, pos_traj, vel_traj return True, pos_traj, vel_traj +class TableTennisMarkovian(TableTennisEnv): + def _get_reward2(self, hit_now, land_now): + + # Phase 1 not hit ball + if not self._hit_ball: + # Not hit ball + min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1)) + return 0.005 * (1 - np.tanh(min_r_b_dist**2)) + + # Phase 2 hit ball now + elif self._hit_ball and hit_now: + return 2 + + # Phase 3 hit ball already and not land yet + elif self._hit_ball and self._ball_landing_pos is None: + min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1)) + return 0.02 * (1 - np.tanh(min_b_des_b_dist**2)) + + # Phase 4 hit ball already and land now + elif self._hit_ball and land_now: + over_net_bonus = int(self._ball_landing_pos[0] < 0) + min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2]) + return 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus + + # Phase 5 hit ball already and land already + elif self._hit_ball and not land_now and self._ball_landing_pos is not None: + return 0 + + else: + raise NotImplementedError + + def _get_reward(self, terminated): + # if not terminated: + # return 0 + + min_r_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj) - np.array(self._racket_traj), axis=1)) + if not self._hit_ball: + # Not hit ball + return 0.2 * (1 - np.tanh(min_r_b_dist**2)) + elif self._ball_landing_pos is None: + # Hit ball but not landing pos + min_b_des_b_dist = np.min(np.linalg.norm(np.array(self._ball_traj)[:,:2] - self._goal_pos[:2], axis=1)) + return 2 + (1 - np.tanh(min_b_des_b_dist**2)) + else: + # Hit ball and land + min_b_des_b_land_dist = np.linalg.norm(self._goal_pos[:2] - self._ball_landing_pos[:2]) + over_net_bonus = int(self._ball_landing_pos[0] < 0) + return 2 + 4 * (1 - np.tanh(min_b_des_b_land_dist ** 2)) + over_net_bonus + + + def _get_traj_invalid_penalty(self, action, pos_traj, tau_bound, delay_bound): + tau_invalid_penalty = 3 * (np.max([0, action[0] - tau_bound[1]]) + np.max([0, tau_bound[0] - action[0]])) + delay_invalid_penalty = 3 * (np.max([0, action[1] - delay_bound[1]]) + np.max([0, delay_bound[0] - action[1]])) + violate_high_bound_error = np.mean(np.maximum(pos_traj - jnt_pos_high, 0)) + violate_low_bound_error = np.mean(np.maximum(jnt_pos_low - pos_traj, 0)) + invalid_penalty = tau_invalid_penalty + delay_invalid_penalty + \ + violate_high_bound_error + violate_low_bound_error + return -invalid_penalty + + def get_invalid_traj_step_penalty(self, pos_traj): + violate_high_bound_error = ( + np.maximum(pos_traj - jnt_pos_high, 0).mean()) + violate_low_bound_error = ( + np.maximum(jnt_pos_low - pos_traj, 0).mean()) + invalid_penalty = violate_high_bound_error + violate_low_bound_error + + + def _update_game_state(self, action): + for _ in range(self.frame_skip): + if self._enable_artificial_wind: + self.data.qfrc_applied[-2] = self._artificial_force + try: + self.do_simulation(action, 1) + except Exception as e: + print("Simulation get unstable return with MujocoException: ", e) + unstable_simulation = True + self._terminated = True + break + + # Update game state + if not self._terminated: + if not self._hit_ball: + self._hit_ball = self._contact_checker(self._ball_contact_id, self._bat_front_id) or \ + self._contact_checker(self._ball_contact_id, self._bat_back_id) + if not self._hit_ball: + ball_land_on_floor_no_hit = self._contact_checker(self._ball_contact_id, self._floor_contact_id) + if ball_land_on_floor_no_hit: + self._ball_landing_pos = self.data.body("target_ball").xpos.copy() + self._terminated = True + if self._hit_ball and not self._ball_contact_after_hit: + if self._contact_checker(self._ball_contact_id, self._floor_contact_id): # first check contact with floor + self._ball_contact_after_hit = True + self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy() + self._terminated = True + elif self._contact_checker(self._ball_contact_id, self._table_contact_id): # second check contact with table + self._ball_contact_after_hit = True + self._ball_landing_pos = self.data.geom("target_ball_contact").xpos.copy() + if self._ball_landing_pos[0] < 0.: # ball lands on the opponent side + self._ball_return_success = True + self._terminated = True + + # update ball trajectory & racket trajectory + self._ball_traj.append(self.data.body("target_ball").xpos.copy()) + self._racket_traj.append(self.data.geom("bat").xpos.copy()) + + def ball_racket_contact(self): + return self._contact_checker(self._ball_contact_id, self._bat_front_id) or \ + self._contact_checker(self._ball_contact_id, self._bat_back_id) + + def step(self, action): + if not self._id_set: + self._set_ids() + + unstable_simulation = False + hit_already = self._hit_ball + if self._steps == self._goal_switching_step and self.np_random.uniform() < 0.5: + new_goal_pos = self._generate_goal_pos(random=True) + new_goal_pos[1] = -new_goal_pos[1] + self._goal_pos = new_goal_pos + self.model.body_pos[5] = np.concatenate([self._goal_pos, [0.77]]) + mujoco.mj_forward(self.model, self.data) + + self._update_game_state(action) + self._steps += 1 + + obs = self._get_obs() + + # Compute reward + if unstable_simulation: + reward = -25 + else: + # reward = self._get_reward(self._terminated) + # hit_now = not hit_already and self._hit_ball + hit_finish = self._hit_ball and not self.ball_racket_contact() + + if hit_finish: + # Clean the ball and racket traj before hit + self._ball_traj = [] + self._racket_traj = [] + + # Simulate the rest of the traj + reward = self._get_reward2(True, False) + while self._steps < MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER: + land_already = self._ball_landing_pos is not None + self._update_game_state(np.zeros_like(action)) + self._steps += 1 + + land_now = (not land_already + and self._ball_landing_pos is not None) + temp_reward = self._get_reward2(False, land_now) + # print(temp_reward) + reward += temp_reward + + # Uncomment the line below to visualize the sim after hit + # self.render(mode="human") + else: + reward = self._get_reward2(False, False) + + # Update ball landing error + land_dist_err = np.linalg.norm(self._ball_landing_pos[:-1] - self._goal_pos) \ + if self._ball_landing_pos is not None else 10. + + info = { + "hit_ball": self._hit_ball, + "ball_returned_success": self._ball_return_success, + "land_dist_error": land_dist_err, + "is_success": self._ball_return_success and land_dist_err < 0.2, + "num_steps": self._steps, + } + + terminated, truncated = self._terminated, self._steps == MAX_EPISODE_STEPS_TABLE_TENNIS_MARKOV_VER + + return obs, reward, terminated, truncated, info class TableTennisWind(TableTennisEnv): def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, **kwargs): @@ -296,7 +482,16 @@ class TableTennisWind(TableTennisEnv): ]) return obs - class TableTennisGoalSwitching(TableTennisEnv): def __init__(self, frame_skip: int = 4, goal_switching_step: int = 99, **kwargs): super().__init__(frame_skip=frame_skip, goal_switching_step=goal_switching_step, **kwargs) + + +class TableTennisRandomInit(TableTennisEnv): + def __init__(self, ctxt_dim: int = 4, frame_skip: int = 4, + random_pos_scale: float = 1.0, + random_vel_scale: float = 0.0): + super().__init__(ctxt_dim=ctxt_dim, frame_skip=frame_skip, + random_pos_scale=random_pos_scale, + random_vel_scale=random_vel_scale) + diff --git a/fancy_gym/envs/mujoco/table_tennis/table_tennis_utils.py b/fancy_gym/envs/mujoco/table_tennis/table_tennis_utils.py index 4d9a2d2..c548736 100644 --- a/fancy_gym/envs/mujoco/table_tennis/table_tennis_utils.py +++ b/fancy_gym/envs/mujoco/table_tennis/table_tennis_utils.py @@ -2,8 +2,9 @@ import numpy as np jnt_pos_low = np.array([-2.6, -2.0, -2.8, -0.9, -4.8, -1.6, -2.2]) jnt_pos_high = np.array([2.6, 2.0, 2.8, 3.1, 1.3, 1.6, 2.2]) -delay_bound = [0.05, 0.15] -tau_bound = [0.5, 1.5] + +jnt_vel_low = np.ones(7) * -7 +jnt_vel_high = np.ones(7) * 7 net_height = 0.1 table_height = 0.77 @@ -48,4 +49,4 @@ def magnus_force(top_spin=0.0, side_spin=0.0, v_ball=np.zeros(3), v_wind=np.zero C_l = 4.68 * 10e-4 - 2.0984 * 10e-5 * (np.linalg.norm(v_ball) - 50) # Lift force coeffient or simply 1.23 w = np.array([0.0, top_spin, side_spin]) # Angular velocity of ball f_m = 0.5 * rho * A * C_l * np.linalg.norm(v_ball-v_wind) * np.cross(w, v_ball-v_wind) - return f_m + return f_m \ No newline at end of file