Merge branch 'master' into readthedocs

This commit is contained in:
Dominik Moritz Roth 2024-01-23 18:04:27 +01:00
commit a9617a392e
8 changed files with 363 additions and 47 deletions

View File

@ -19,8 +19,8 @@ Built upon the foundation of [Gymnasium](https://gymnasium.farama.org) (a mainta
## Quickstart Guide ## Quickstart Guide
| ⚠ We recommend installing `fancy_gym` into a virtual environment as provided by [venv](https://docs.python.org/3/library/venv.html), [Poetry](https://python-poetry.org/) or [Conda](https://docs.conda.io/en/latest/). | | ⚠ We recommend installing `fancy_gym` into a virtual environment as provided by [venv](https://docs.python.org/3/library/venv.html), [Poetry](https://python-poetry.org/) or [Conda](https://docs.conda.io/en/latest/). |
| ----------------------------------------------------------------------------------------------------------------------------------------------- | | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
Install via pip [or use an alternative installation method](https://dominik-roth.eu/fancy/guide/installation.html) Install via pip [or use an alternative installation method](https://dominik-roth.eu/fancy/guide/installation.html)

View File

@ -291,7 +291,7 @@ register(
) )
# Air Hockey environments # Air Hockey environments
for env_mode in ["7dof-hit", "7dof-defend", "3dof-hit", "3dof-defend"]: for env_mode in ["7dof-hit", "7dof-defend", "3dof-hit", "3dof-defend", "7dof-hit-airhockit2023", "7dof-defend-airhockit2023"]:
register( register(
id=f'fancy/AirHockey-{env_mode}-v0', id=f'fancy/AirHockey-{env_mode}-v0',
entry_point='fancy_gym.envs.mujoco:AirHockeyEnv', entry_point='fancy_gym.envs.mujoco:AirHockeyEnv',

View File

@ -8,9 +8,9 @@ from fancy_gym.envs.mujoco.air_hockey.utils import robot_to_world
from mushroom_rl.core import Environment from mushroom_rl.core import Environment
class AirHockeyEnv(Environment): class AirHockeyEnv(Environment):
metadata = {"render_modes": ["human"], "render_fps": 50} metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 50}
def __init__(self, env_mode=None, interpolation_order=3, render_mode=None, **kwargs): def __init__(self, env_mode=None, interpolation_order=3, render_mode=None, width=1920, height=1080, **kwargs):
""" """
Environment Constructor Environment Constructor
@ -30,7 +30,10 @@ class AirHockeyEnv(Environment):
"7dof-defend": position.IiwaPositionDefend, "7dof-defend": position.IiwaPositionDefend,
"3dof-hit": position.PlanarPositionHit, "3dof-hit": position.PlanarPositionHit,
"3dof-defend": position.PlanarPositionDefend "3dof-defend": position.PlanarPositionDefend,
"7dof-hit-airhockit2023": position.IiwaPositionHitAirhocKIT2023,
"7dof-defend-airhockit2023": position.IiwaPositionDefendAirhocKIT2023,
} }
if env_mode not in env_dict: if env_mode not in env_dict:
@ -39,37 +42,53 @@ class AirHockeyEnv(Environment):
if env_mode == "tournament" and type(interpolation_order) != tuple: if env_mode == "tournament" and type(interpolation_order) != tuple:
interpolation_order = (interpolation_order, interpolation_order) interpolation_order = (interpolation_order, interpolation_order)
self.render_mode = render_mode
self.render_human_active = False
# Determine headless mode based on render_mode
headless = self.render_mode == 'rgb_array'
# Prepare viewer_params
viewer_params = kwargs.get('viewer_params', {})
viewer_params.update({'headless': headless, 'width': width, 'height': height})
kwargs['viewer_params'] = viewer_params
self.base_env = env_dict[env_mode](interpolation_order=interpolation_order, **kwargs) self.base_env = env_dict[env_mode](interpolation_order=interpolation_order, **kwargs)
self.env_name = env_mode self.env_name = env_mode
self.env_info = self.base_env.env_info self.env_info = self.base_env.env_info
single_robot_obs_size = len(self.base_env.info.observation_space.low)
if env_mode == "tournament":
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,single_robot_obs_size), dtype=np.float64)
else:
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(single_robot_obs_size,), dtype=np.float64)
robot_info = self.env_info["robot"]
if env_mode != "tournament": if hasattr(self.base_env, "wrapper_obs_space") and hasattr(self.base_env, "wrapper_act_space"):
if interpolation_order in [1, 2]: self.observation_space = self.base_env.wrapper_obs_space
self.action_space = spaces.Box(low=robot_info["joint_pos_limit"][0], high=robot_info["joint_pos_limit"][1]) self.action_space = self.base_env.wrapper_act_space
if interpolation_order in [3, 4, -1]:
self.action_space = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0]]),
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1]]))
if interpolation_order in [5]:
self.action_space = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0], robot_info["joint_acc_limit"][0]]),
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1], robot_info["joint_acc_limit"][1]]))
else: else:
acts = [None, None] single_robot_obs_size = len(self.base_env.info.observation_space.low)
for i in range(2): if env_mode == "tournament":
if interpolation_order[i] in [1, 2]: self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,single_robot_obs_size), dtype=np.float64)
acts[i] = spaces.Box(low=robot_info["joint_pos_limit"][0], high=robot_info["joint_pos_limit"][1]) else:
if interpolation_order[i] in [3, 4, -1]: self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(single_robot_obs_size,), dtype=np.float64)
acts[i] = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0]]), robot_info = self.env_info["robot"]
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1]]))
if interpolation_order[i] in [5]: if env_mode != "tournament":
acts[i] = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0], robot_info["joint_acc_limit"][0]]), if interpolation_order in [1, 2]:
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1], robot_info["joint_acc_limit"][1]])) self.action_space = spaces.Box(low=robot_info["joint_pos_limit"][0], high=robot_info["joint_pos_limit"][1])
self.action_space = spaces.Tuple((acts[0], acts[1])) if interpolation_order in [3, 4, -1]:
self.action_space = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0]]),
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1]]))
if interpolation_order in [5]:
self.action_space = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0], robot_info["joint_acc_limit"][0]]),
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1], robot_info["joint_acc_limit"][1]]))
else:
acts = [None, None]
for i in range(2):
if interpolation_order[i] in [1, 2]:
acts[i] = spaces.Box(low=robot_info["joint_pos_limit"][0], high=robot_info["joint_pos_limit"][1])
if interpolation_order[i] in [3, 4, -1]:
acts[i] = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0]]),
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1]]))
if interpolation_order[i] in [5]:
acts[i] = spaces.Box(low=np.vstack([robot_info["joint_pos_limit"][0], robot_info["joint_vel_limit"][0], robot_info["joint_acc_limit"][0]]),
high=np.vstack([robot_info["joint_pos_limit"][1], robot_info["joint_vel_limit"][1], robot_info["joint_acc_limit"][1]]))
self.action_space = spaces.Tuple((acts[0], acts[1]))
constraint_list = constraints.ConstraintList() constraint_list = constraints.ConstraintList()
constraint_list.add(constraints.JointPositionConstraint(self.env_info)) constraint_list.add(constraints.JointPositionConstraint(self.env_info))
@ -81,9 +100,6 @@ class AirHockeyEnv(Environment):
self.env_info['constraints'] = constraint_list self.env_info['constraints'] = constraint_list
self.env_info['env_name'] = self.env_name self.env_info['env_name'] = self.env_name
self.render_mode = render_mode
self.render_human_active = False
super().__init__(self.base_env.info) super().__init__(self.base_env.info)
def step(self, action): def step(self, action):
@ -111,15 +127,21 @@ class AirHockeyEnv(Environment):
if self.env_info['env_name'] == "tournament": if self.env_info['env_name'] == "tournament":
obs = np.array(np.split(obs, 2)) obs = np.array(np.split(obs, 2))
if self.render_human_active: if self.render_human_active:
self.base_env.render() self.base_env.render()
return obs, reward, done, False, info return obs, reward, done, False, info
def render(self): def render(self):
self.render_human_active = True if self.render_mode == 'rgb_array':
return self.base_env.render(record = True)
elif self.render_mode == 'human':
self.render_human_active = True
self.base_env.render()
else:
raise ValueError(f"Unsupported render mode: '{self.render_mode}'")
def reset(self, seed=None, options={}): def reset(self, seed=None, options={}):
self.base_env.seed(seed) self.base_env.seed(seed)
obs = self.base_env.reset() obs = self.base_env.reset()
@ -177,4 +199,4 @@ if __name__ == "__main__":
J = 0. J = 0.
gamma = 1. gamma = 1.
steps = 0 steps = 0
env.reset() env.reset()

View File

@ -261,10 +261,14 @@ class PlanarPositionDefend(PositionControlPlanar, three_dof.AirHockeyDefend):
class IiwaPositionHit(PositionControlIIWA, seven_dof.AirHockeyHit): class IiwaPositionHit(PositionControlIIWA, seven_dof.AirHockeyHit):
pass pass
class IiwaPositionHitAirhocKIT2023(PositionControlIIWA, seven_dof.AirHockeyHitAirhocKIT2023):
pass
class IiwaPositionDefend(PositionControlIIWA, seven_dof.AirHockeyDefend): class IiwaPositionDefend(PositionControlIIWA, seven_dof.AirHockeyDefend):
pass pass
class IiwaPositionDefendAirhocKIT2023(PositionControlIIWA, seven_dof.AirHockeyDefendAirhocKIT2023):
pass
class IiwaPositionTournament(PositionControlIIWA, seven_dof.AirHockeyTournament): class IiwaPositionTournament(PositionControlIIWA, seven_dof.AirHockeyTournament):
pass pass

View File

@ -1,4 +1,4 @@
from .env_base import AirHockeyBase from .env_base import AirHockeyBase
from .tournament import AirHockeyTournament from .tournament import AirHockeyTournament
from .hit import AirHockeyHit from .hit import AirHockeyHit, AirHockeyHitAirhocKIT2023
from .defend import AirHockeyDefend from .defend import AirHockeyDefend, AirHockeyDefendAirhocKIT2023

View File

@ -0,0 +1,114 @@
import numpy as np
from gymnasium import spaces
from fancy_gym.envs.mujoco.air_hockey.seven_dof.env_single import AirHockeySingle
from fancy_gym.envs.mujoco.air_hockey.utils import inverse_kinematics, forward_kinematics, jacobian
class AirhocKIT2023BaseEnv(AirHockeySingle):
def __init__(self, noise=False, **kwargs):
super().__init__(**kwargs)
obs_low = np.hstack([[-np.inf] * 37])
obs_high = np.hstack([[np.inf] * 37])
self.wrapper_obs_space = spaces.Box(low=obs_low, high=obs_high, dtype=np.float64)
self.wrapper_act_space = spaces.Box(low=np.repeat(-100., 6), high=np.repeat(100., 6))
self.noise = noise
# We don't need puck yaw observations
def filter_obs(self, obs):
obs = np.hstack([obs[0:2], obs[3:5], obs[6:12], obs[13:19], obs[20:]])
return obs
def add_noise(self, obs):
if not self.noise:
return
obs[self.env_info["puck_pos_ids"]] += np.random.normal(0, 0.001, 3)
obs[self.env_info["puck_vel_ids"]] += np.random.normal(0, 0.1, 3)
def reset(self):
self.last_acceleration = np.repeat(0., 6)
obs = super().reset()
self.add_noise(obs)
self.interp_pos = obs[self.env_info["joint_pos_ids"]][:-1]
self.interp_vel = obs[self.env_info["joint_vel_ids"]][:-1]
self.last_planned_world_pos = self._fk(self.interp_pos)
obs = np.hstack([
obs, self.interp_pos, self.interp_vel, self.last_acceleration, self.last_planned_world_pos
])
return self.filter_obs(obs)
def step(self, action):
action /= 10
new_vel = self.interp_vel + action
jerk = 2 * (new_vel - self.interp_vel - self.last_acceleration * 0.02) / (0.02 ** 2)
new_pos = self.interp_pos + self.interp_vel * 0.02 + (1/2) * self.last_acceleration * (0.02 ** 2) + (1/6) * jerk * (0.02 ** 3)
abs_action = np.vstack([np.hstack([new_pos, 0]), np.hstack([new_vel, 0])])
self.interp_pos = new_pos
self.interp_vel = new_vel
self.last_acceleration += jerk * 0.02
obs, rew, done, info = super().step(abs_action)
self.add_noise(obs)
self.last_planned_world_pos = self._fk(self.interp_pos)
obs = np.hstack([
obs, self.interp_pos, self.interp_vel, self.last_acceleration, self.last_planned_world_pos
])
fatal_rew = self.check_fatal(obs)
if fatal_rew != 0:
return self.filter_obs(obs), fatal_rew, True, info
return self.filter_obs(obs), rew, done, info
def check_constraints(self, constraint_values):
fatal_rew = 0
j_pos_constr = constraint_values["joint_pos_constr"]
if j_pos_constr.max() > 0:
fatal_rew += j_pos_constr.max()
j_vel_constr = constraint_values["joint_vel_constr"]
if j_vel_constr.max() > 0:
fatal_rew += j_vel_constr.max()
ee_constr = constraint_values["ee_constr"]
if ee_constr.max() > 0:
fatal_rew += ee_constr.max()
link_constr = constraint_values["link_constr"]
if link_constr.max() > 0:
fatal_rew += link_constr.max()
return -fatal_rew
def check_fatal(self, obs):
fatal_rew = 0
q = obs[self.env_info["joint_pos_ids"]]
qd = obs[self.env_info["joint_vel_ids"]]
constraint_values_obs = self.env_info["constraints"].fun(q, qd)
fatal_rew += self.check_constraints(constraint_values_obs)
return -fatal_rew
def _fk(self, pos):
res, _ = forward_kinematics(self.env_info["robot"]["robot_model"],
self.env_info["robot"]["robot_data"], pos)
return res.astype(np.float32)
def _ik(self, world_pos, init_q=None):
success, pos = inverse_kinematics(self.env_info["robot"]["robot_model"],
self.env_info["robot"]["robot_data"],
world_pos,
initial_q=init_q)
pos = pos.astype(np.float32)
assert success
return pos
def _jacobian(self, pos):
return jacobian(self.env_info["robot"]["robot_model"],
self.env_info["robot"]["robot_data"],
pos).astype(np.float32)

View File

@ -1,6 +1,7 @@
import numpy as np import numpy as np
from fancy_gym.envs.mujoco.air_hockey.seven_dof.env_single import AirHockeySingle from fancy_gym.envs.mujoco.air_hockey.seven_dof.env_single import AirHockeySingle
from fancy_gym.envs.mujoco.air_hockey.seven_dof.airhockit_base_env import AirhocKIT2023BaseEnv
class AirHockeyDefend(AirHockeySingle): class AirHockeyDefend(AirHockeySingle):
@ -10,9 +11,7 @@ class AirHockeyDefend(AirHockeySingle):
""" """
def __init__(self, gamma=0.99, horizon=500, viewer_params={}): def __init__(self, gamma=0.99, horizon=500, viewer_params={}):
self.init_velocity_range = (1, 3) self.init_velocity_range = (1, 3)
self.start_range = np.array([[0.29, 0.65], [-0.4, 0.4]]) # Table Frame self.start_range = np.array([[0.29, 0.65], [-0.4, 0.4]]) # Table Frame
self.init_ee_range = np.array([[0.60, 1.25], [-0.4, 0.4]]) # Robot Frame
super().__init__(gamma=gamma, horizon=horizon, viewer_params=viewer_params) super().__init__(gamma=gamma, horizon=horizon, viewer_params=viewer_params)
def setup(self, obs): def setup(self, obs):
@ -32,7 +31,7 @@ class AirHockeyDefend(AirHockeySingle):
self._write_data("puck_y_vel", puck_vel[1]) self._write_data("puck_y_vel", puck_vel[1])
self._write_data("puck_yaw_vel", puck_vel[2]) self._write_data("puck_yaw_vel", puck_vel[2])
super(AirHockeyDefend, self).setup(obs) super().setup(obs)
def reward(self, state, action, next_state, absorbing): def reward(self, state, action, next_state, absorbing):
return 0 return 0
@ -46,6 +45,98 @@ class AirHockeyDefend(AirHockeySingle):
return True return True
return super().is_absorbing(state) return super().is_absorbing(state)
class AirHockeyDefendAirhocKIT2023(AirhocKIT2023BaseEnv):
def __init__(self, gamma=0.99, horizon=200, viewer_params={}, **kwargs):
super().__init__(gamma=gamma, horizon=horizon, viewer_params=viewer_params, **kwargs)
self.init_velocity_range = (1, 3)
self.start_range = np.array([[0.4, 0.75], [-0.4, 0.4]]) # Table Frame
self._setup_metrics()
def setup(self, obs):
self._setup_metrics()
puck_pos = np.random.rand(2) * (self.start_range[:, 1] - self.start_range[:, 0]) + self.start_range[:, 0]
lin_vel = np.random.uniform(self.init_velocity_range[0], self.init_velocity_range[1])
angle = np.random.uniform(-0.5, 0.5)
puck_vel = np.zeros(3)
puck_vel[0] = -np.cos(angle) * lin_vel
puck_vel[1] = np.sin(angle) * lin_vel
puck_vel[2] = np.random.uniform(-10, 10)
self._write_data("puck_x_pos", puck_pos[0])
self._write_data("puck_y_pos", puck_pos[1])
self._write_data("puck_x_vel", puck_vel[0])
self._write_data("puck_y_vel", puck_vel[1])
self._write_data("puck_yaw_vel", puck_vel[2])
super().setup(obs)
def reset(self, *args):
obs = super().reset()
self.hit_step_flag = False
self.hit_step = False
self.received_hit_reward = False
self.give_reward_next = False
return obs
def _setup_metrics(self):
self.episode_steps = 0
self.has_hit = False
def _simulation_post_step(self):
if not self.has_hit:
self.has_hit = self._check_collision("puck", "robot_1/ee")
super()._simulation_post_step()
def _step_finalize(self):
self.episode_steps += 1
return super()._step_finalize()
def reward(self, state, action, next_state, absorbing):
puck_pos, puck_vel = self.get_puck(next_state)
ee_pos, _ = self.get_ee()
rew = 0.01
if -0.7 < puck_pos[0] <= -0.2 and np.linalg.norm(puck_vel[:2]) < 0.1:
assert absorbing
rew += 70
if self.has_hit and not self.hit_step_flag:
self.hit_step_flag = True
self.hit_step = True
else:
self.hit_step = False
f = lambda puck_vel: 30 + 100 * (100 ** (-0.25 * np.linalg.norm(puck_vel[:2])))
if not self.give_reward_next and not self.received_hit_reward and self.hit_step and ee_pos[0] < puck_pos[0]:
self.hit_this_step = True
if np.linalg.norm(puck_vel[:2]) < 0.1:
return rew + f(puck_vel)
self.give_reward_next = True
return rew
if not self.received_hit_reward and self.give_reward_next:
self.received_hit_reward = True
if puck_vel[0] >= -0.2:
return rew + f(puck_vel)
return rew
else:
return rew
def is_absorbing(self, obs):
puck_pos, puck_vel = self.get_puck(obs)
# If puck is over the middle line and moving towards opponent
if puck_pos[0] > 0 and puck_vel[0] > 0:
return True
if self.episode_steps == self._mdp_info.horizon:
return True
if np.linalg.norm(puck_vel[:2]) < 0.1:
return True
return super().is_absorbing(obs)
if __name__ == '__main__': if __name__ == '__main__':
env = AirHockeyDefend() env = AirHockeyDefend()

View File

@ -1,6 +1,7 @@
import numpy as np import numpy as np
from fancy_gym.envs.mujoco.air_hockey.seven_dof.env_single import AirHockeySingle from fancy_gym.envs.mujoco.air_hockey.seven_dof.env_single import AirHockeySingle
from fancy_gym.envs.mujoco.air_hockey.seven_dof.airhockit_base_env import AirhocKIT2023BaseEnv
class AirHockeyHit(AirHockeySingle): class AirHockeyHit(AirHockeySingle):
@ -14,9 +15,6 @@ class AirHockeyHit(AirHockeySingle):
opponent_agent(Agent, None): Agent which controls the opponent opponent_agent(Agent, None): Agent which controls the opponent
moving_init(bool, False): If true, initialize the puck with inital velocity. moving_init(bool, False): If true, initialize the puck with inital velocity.
""" """
self.hit_range = np.array([[-0.65, -0.25], [-0.4, 0.4]]) # Table Frame
self.init_velocity_range = (0, 0.5) # Table Frame
super().__init__(gamma=gamma, horizon=horizon, viewer_params=viewer_params) super().__init__(gamma=gamma, horizon=horizon, viewer_params=viewer_params)
self.moving_init = moving_init self.moving_init = moving_init
@ -58,6 +56,93 @@ class AirHockeyHit(AirHockeySingle):
return True return True
return super(AirHockeyHit, self).is_absorbing(obs) return super(AirHockeyHit, self).is_absorbing(obs)
class AirHockeyHitAirhocKIT2023(AirhocKIT2023BaseEnv):
def __init__(self, gamma=0.99, horizon=500, moving_init=True, viewer_params={}, **kwargs):
super().__init__(gamma=gamma, horizon=horizon, viewer_params=viewer_params, **kwargs)
self.moving_init = moving_init
hit_width = self.env_info['table']['width'] / 2 - self.env_info['puck']['radius'] - \
self.env_info['mallet']['radius'] * 2
self.hit_range = np.array([[-0.7, -0.2], [-hit_width, hit_width]]) # Table Frame
self.init_velocity_range = (0, 0.5) # Table Frame
self.init_ee_range = np.array([[0.60, 1.25], [-0.4, 0.4]]) # Robot Frame
self._setup_metrics()
def reset(self, *args):
obs = super().reset()
self.last_ee_pos = self.last_planned_world_pos.copy()
self.last_ee_pos[0] -= 1.51
return obs
def setup(self, obs):
self._setup_metrics()
puck_pos = np.random.rand(2) * (self.hit_range[:, 1] - self.hit_range[:, 0]) + self.hit_range[:, 0]
self._write_data("puck_x_pos", puck_pos[0])
self._write_data("puck_y_pos", puck_pos[1])
if self.moving_init:
lin_vel = np.random.uniform(self.init_velocity_range[0], self.init_velocity_range[1])
angle = np.random.uniform(-np.pi / 2 - 0.1, np.pi / 2 + 0.1)
puck_vel = np.zeros(3)
puck_vel[0] = -np.cos(angle) * lin_vel
puck_vel[1] = np.sin(angle) * lin_vel
puck_vel[2] = np.random.uniform(-2, 2)
self._write_data("puck_x_vel", puck_vel[0])
self._write_data("puck_y_vel", puck_vel[1])
self._write_data("puck_yaw_vel", puck_vel[2])
super().setup(obs)
def _setup_metrics(self):
self.episode_steps = 0
self.has_scored = False
def _step_finalize(self):
cur_obs = self._create_observation(self.obs_helper._build_obs(self._data))
puck_pos, _ = self.get_puck(cur_obs) # world frame [x, y, z] and [x', y', z']
if not self.has_scored:
boundary = np.array([self.env_info['table']['length'], self.env_info['table']['width']]) / 2
self.has_scored = np.any(np.abs(puck_pos[:2]) > boundary) and puck_pos[0] > 0
self.episode_steps += 1
return super()._step_finalize()
def reward(self, state, action, next_state, absorbing):
rew = 0
puck_pos, puck_vel = self.get_puck(next_state)
ee_pos, _ = self.get_ee()
ee_vel = (ee_pos - self.last_ee_pos) / 0.02
self.last_ee_pos = ee_pos
if puck_vel[0] < 0.25 and puck_pos[0] < 0:
ee_puck_dir = (puck_pos - ee_pos)[:2]
ee_puck_dir = ee_puck_dir / np.linalg.norm(ee_puck_dir)
rew += 1 * max(0, np.dot(ee_puck_dir, ee_vel[:2]))
else:
rew += 10 * np.linalg.norm(puck_vel[:2])
if self.has_scored:
rew += 2000 + 5000 * np.linalg.norm(puck_vel[:2])
return rew
def is_absorbing(self, obs):
puck_pos, puck_vel = self.get_puck(obs)
# Stop if the puck bounces back on the opponents wall
if puck_pos[0] > 0 and puck_vel[0] < 0:
return True
if self.has_scored:
return True
if self.episode_steps == self._mdp_info.horizon:
return True
return super().is_absorbing(obs)
if __name__ == '__main__': if __name__ == '__main__':
env = AirHockeyHit(moving_init=True) env = AirHockeyHit(moving_init=True)