from typing import Optional from gym.envs.mujoco.hopper_v3 import HopperEnv import numpy as np import os MAX_EPISODE_STEPS_HOPPERJUMP = 250 class ALRHopperJumpEnv(HopperEnv): """ Initialization changes to normal Hopper: - terminate_when_unhealthy: True -> False - healthy_z_range: (0.7, float('inf')) -> (0.5, float('inf')) - healthy_angle_range: (-0.2, 0.2) -> (-float('inf'), float('inf')) - exclude_current_positions_from_observation: True -> False """ def __init__( self, xml_file='hopper_jump.xml', forward_reward_weight=1.0, ctrl_cost_weight=1e-3, healthy_reward=1.0, penalty=0.0, terminate_when_unhealthy=False, healthy_state_range=(-100.0, 100.0), healthy_z_range=(0.5, float('inf')), healthy_angle_range=(-float('inf'), float('inf')), reset_noise_scale=5e-3, exclude_current_positions_from_observation=False, ): self._steps = 0 self.max_height = 0 # self.penalty = penalty self.goal = 0 self._floor_geom_id = None self._foot_geom_id = None self.contact_with_floor = False self.init_floor_contact = False self.has_left_floor = False self.contact_dist = None xml_file = os.path.join(os.path.dirname(__file__), "assets", xml_file) super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, terminate_when_unhealthy, healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale, exclude_current_positions_from_observation) def step(self, action): self._steps += 1 self._floor_geom_id = self.model.geom_name2id('floor') self._foot_geom_id = self.model.geom_name2id('foot_geom') self.do_simulation(action, self.frame_skip) height_after = self.get_body_com("torso")[2] site_pos_after = self.data.get_site_xpos('foot_site') self.max_height = max(height_after, self.max_height) has_floor_contact = self._is_floor_foot_contact() if not self.contact_with_floor else False if not self.init_floor_contact: self.init_floor_contact = has_floor_contact if self.init_floor_contact and not self.has_left_floor: self.has_left_floor = not has_floor_contact if not self.contact_with_floor and self.has_left_floor: self.contact_with_floor = has_floor_contact ctrl_cost = self.control_cost(action) costs = ctrl_cost done = False goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0])) if self.contact_dist is None and self.contact_with_floor: self.contact_dist = goal_dist rewards = 0 if self._steps >= MAX_EPISODE_STEPS_HOPPERJUMP: # healthy_reward = 0 if self.context else self.healthy_reward * self._steps healthy_reward = self.healthy_reward * 2 # * self._steps contact_dist = self.contact_dist if self.contact_dist is not None else 5 dist_reward = self._forward_reward_weight * (-3 * goal_dist + 10 * self.max_height - 2 * contact_dist) rewards = dist_reward + healthy_reward observation = self._get_obs() reward = rewards - costs info = dict( height=height_after, x_pos=site_pos_after, max_height=self.max_height, goal=self.goal, goal_dist=goal_dist, height_rew=self.max_height, healthy_reward=self.healthy_reward * 2, healthy=self.is_healthy, contact_dist=self.contact_dist if self.contact_dist is not None else 0 ) return observation, reward, done, info def _get_obs(self): return np.append(super()._get_obs(), self.goal) def reset(self, *, seed: Optional[int] = None, return_info: bool = False, options: Optional[dict] = None, ): self.goal = self.np_random.uniform(1.4, 2.16, 1)[0] # 1.3 2.3 self.max_height = 0 self._steps = 0 return super().reset() # overwrite reset_model to make it deterministic def reset_model(self): qpos = self.init_qpos # + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq) qvel = self.init_qvel # + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nv) self.set_state(qpos, qvel) observation = self._get_obs() self.has_left_floor = False self.contact_with_floor = False self.init_floor_contact = False self.contact_dist = None return observation def _is_floor_foot_contact(self): floor_geom_id = self.model.geom_name2id('floor') foot_geom_id = self.model.geom_name2id('foot_geom') for i in range(self.data.ncon): contact = self.data.contact[i] collision = contact.geom1 == floor_geom_id and contact.geom2 == foot_geom_id collision_trans = contact.geom1 == foot_geom_id and contact.geom2 == floor_geom_id if collision or collision_trans: return True return False class ALRHopperXYJumpEnv(ALRHopperJumpEnv): def step(self, action): self._floor_geom_id = self.model.geom_name2id('floor') self._foot_geom_id = self.model.geom_name2id('foot_geom') self._steps += 1 self.do_simulation(action, self.frame_skip) height_after = self.get_body_com("torso")[2] site_pos_after = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy() self.max_height = max(height_after, self.max_height) # floor_contact = self._contact_checker(self._floor_geom_id, self._foot_geom_id) if not self.contact_with_floor else False # self.init_floor_contact = floor_contact if not self.init_floor_contact else self.init_floor_contact # self.has_left_floor = not floor_contact if self.init_floor_contact and not self.has_left_floor else self.has_left_floor # self.contact_with_floor = floor_contact if not self.contact_with_floor and self.has_left_floor else self.contact_with_floor floor_contact = self._is_floor_foot_contact(self._floor_geom_id, self._foot_geom_id) if not self.contact_with_floor else False if not self.init_floor_contact: self.init_floor_contact = floor_contact if self.init_floor_contact and not self.has_left_floor: self.has_left_floor = not floor_contact if not self.contact_with_floor and self.has_left_floor: self.contact_with_floor = floor_contact if self.contact_dist is None and self.contact_with_floor: self.contact_dist = np.linalg.norm(self.sim.data.site_xpos[self.model.site_name2id('foot_site')] - np.array([self.goal, 0, 0])) ctrl_cost = self.control_cost(action) costs = ctrl_cost done = False goal_dist = np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0])) rewards = 0 if self._steps >= self.max_episode_steps: # healthy_reward = 0 if self.context else self.healthy_reward * self._steps healthy_reward = self.healthy_reward * 2 # * self._steps contact_dist = self.contact_dist if self.contact_dist is not None else 5 dist_reward = self._forward_reward_weight * (-3 * goal_dist + 10 * self.max_height - 2 * contact_dist) rewards = dist_reward + healthy_reward observation = self._get_obs() reward = rewards - costs info = { 'height': height_after, 'x_pos': site_pos_after, 'max_height': self.max_height, 'goal': self.goal, 'goal_dist': goal_dist, 'height_rew': self.max_height, 'healthy_reward': self.healthy_reward * 2, 'healthy': self.is_healthy, 'contact_dist': self.contact_dist if self.contact_dist is not None else 0 } return observation, reward, done, info def reset_model(self): self.init_qpos[1] = 1.5 self._floor_geom_id = self.model.geom_name2id('floor') self._foot_geom_id = self.model.geom_name2id('foot_geom') noise_low = -np.zeros(self.model.nq) noise_low[3] = -0.5 noise_low[4] = -0.2 noise_low[5] = 0 noise_high = np.zeros(self.model.nq) noise_high[3] = 0 noise_high[4] = 0 noise_high[5] = 0.785 rnd_vec = self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq) qpos = self.init_qpos + rnd_vec qvel = self.init_qvel self.set_state(qpos, qvel) observation = self._get_obs() self.has_left_floor = False self.contact_with_floor = False self.init_floor_contact = False self.contact_dist = None return observation def reset(self): super().reset() self.goal = self.np_random.uniform(0.3, 1.35, 1)[0] self.sim.model.body_pos[self.sim.model.body_name2id('goal_site_body')] = np.array([self.goal, 0, 0]) return self.reset_model() def _get_obs(self): goal_diff = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy() \ - np.array([self.goal, 0, 0]) return np.concatenate((super(ALRHopperXYJumpEnv, self)._get_obs(), goal_diff)) def set_context(self, context): # context is 4 dimensional qpos = self.init_qpos qvel = self.init_qvel qpos[-3:] = context[:3] self.goal = context[-1] self.set_state(qpos, qvel) self.sim.model.body_pos[self.sim.model.body_name2id('goal_site_body')] = np.array([self.goal, 0, 0]) return self._get_obs() class ALRHopperXYJumpEnvStepBased(ALRHopperXYJumpEnv): def __init__( self, xml_file='hopper_jump.xml', forward_reward_weight=1.0, ctrl_cost_weight=1e-3, healthy_reward=0.0, penalty=0.0, context=True, terminate_when_unhealthy=False, healthy_state_range=(-100.0, 100.0), healthy_z_range=(0.5, float('inf')), healthy_angle_range=(-float('inf'), float('inf')), reset_noise_scale=5e-3, exclude_current_positions_from_observation=False, max_episode_steps=250, height_scale=10, dist_scale=3, healthy_scale=2 ): self.height_scale = height_scale self.dist_scale = dist_scale self.healthy_scale = healthy_scale super().__init__(xml_file, forward_reward_weight, ctrl_cost_weight, healthy_reward, penalty, context, terminate_when_unhealthy, healthy_state_range, healthy_z_range, healthy_angle_range, reset_noise_scale, exclude_current_positions_from_observation, max_episode_steps) def step(self, action): self._floor_geom_id = self.model.geom_name2id('floor') self._foot_geom_id = self.model.geom_name2id('foot_geom') self._steps += 1 self.do_simulation(action, self.frame_skip) height_after = self.get_body_com("torso")[2] site_pos_after = self.sim.data.site_xpos[self.model.site_name2id('foot_site')].copy() self.max_height = max(height_after, self.max_height) ctrl_cost = self.control_cost(action) healthy_reward = self.healthy_reward * self.healthy_scale height_reward = self.height_scale * height_after goal_dist = np.atleast_1d(np.linalg.norm(site_pos_after - np.array([self.goal, 0, 0], dtype=object)))[0] goal_dist_reward = -self.dist_scale * goal_dist dist_reward = self._forward_reward_weight * (goal_dist_reward + height_reward) reward = -ctrl_cost + healthy_reward + dist_reward done = False observation = self._get_obs() ########################################################### # This is only for logging the distance to goal when first having the contact ########################################################## floor_contact = self._is_floor_foot_contact(self._floor_geom_id, self._foot_geom_id) if not self.contact_with_floor else False if not self.init_floor_contact: self.init_floor_contact = floor_contact if self.init_floor_contact and not self.has_left_floor: self.has_left_floor = not floor_contact if not self.contact_with_floor and self.has_left_floor: self.contact_with_floor = floor_contact if self.contact_dist is None and self.contact_with_floor: self.contact_dist = np.linalg.norm(self.sim.data.site_xpos[self.model.site_name2id('foot_site')] - np.array([self.goal, 0, 0])) info = { 'height': height_after, 'x_pos': site_pos_after, 'max_height': self.max_height, 'goal': self.goal, 'goal_dist': goal_dist, 'height_rew': self.max_height, 'healthy_reward': self.healthy_reward * self.healthy_reward, 'healthy': self.is_healthy, 'contact_dist': self.contact_dist if self.contact_dist is not None else 0 } return observation, reward, done, info