.
This commit is contained in:
parent
7e828b8af9
commit
1965899903
117
mujoco_maze/dir_point.py
Normal file
117
mujoco_maze/dir_point.py
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
from typing import Optional, Tuple
|
||||||
|
import gym
|
||||||
|
import mujoco
|
||||||
|
import numpy as np
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from mujoco_maze.agent_model import AgentModel
|
||||||
|
|
||||||
|
class DirPointEnv(AgentModel):
|
||||||
|
metadata = {
|
||||||
|
"render_modes": [
|
||||||
|
"human",
|
||||||
|
"rgb_array",
|
||||||
|
"depth_array",
|
||||||
|
],
|
||||||
|
"render_fps": 50,
|
||||||
|
}
|
||||||
|
|
||||||
|
FILE: str = "point.xml"
|
||||||
|
MANUAL_COLLISION: bool = True
|
||||||
|
RADIUS: float = 0.4
|
||||||
|
OBJBALL_TYPE: str = "hinge"
|
||||||
|
VELOCITY_LIMITS: float = 0.333
|
||||||
|
ACCELERATION_LIMITS: float = 0.1
|
||||||
|
|
||||||
|
def __init__(self, file_path: Optional[str] = None, maze_env=None) -> None:
|
||||||
|
self.maze_env = maze_env
|
||||||
|
high = np.inf * np.ones(4, dtype=np.float32) # [target_x, target_y, current_x, current_y, velocity_x, velocity_y]
|
||||||
|
low = -high
|
||||||
|
observation_space = gym.spaces.Box(low, high)
|
||||||
|
action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(2,), dtype=np.float32) # Normalized [-1, 1] action space
|
||||||
|
self.action_penalty = 0.1
|
||||||
|
super().__init__(file_path, 1, observation_space)
|
||||||
|
self.action_space = action_space
|
||||||
|
|
||||||
|
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
|
||||||
|
# Calculate the maze size based on structure and scaling
|
||||||
|
maze_size = self._calculate_maze_size()
|
||||||
|
|
||||||
|
# Scale the action to the maze size
|
||||||
|
direction = (action)/2.5 * maze_size
|
||||||
|
|
||||||
|
# Get current position and velocity
|
||||||
|
current_vel = self.data.qvel[:2]
|
||||||
|
|
||||||
|
# Compute desired velocity towards the target position
|
||||||
|
distance = np.linalg.norm(direction)
|
||||||
|
if distance > 0:
|
||||||
|
direction /= distance # Normalize the direction
|
||||||
|
|
||||||
|
desired_velocity = direction * min(distance, self.VELOCITY_LIMITS)
|
||||||
|
|
||||||
|
# Compute acceleration required to reach the desired velocity
|
||||||
|
acceleration = desired_velocity - current_vel
|
||||||
|
acceleration = np.clip(acceleration, -self.ACCELERATION_LIMITS, self.ACCELERATION_LIMITS)
|
||||||
|
|
||||||
|
# Update velocity and position
|
||||||
|
new_vel = current_vel + acceleration
|
||||||
|
new_vel = np.clip(new_vel, -self.VELOCITY_LIMITS, self.VELOCITY_LIMITS)
|
||||||
|
|
||||||
|
qpos = self.data.qpos.copy()
|
||||||
|
qpos[:2] += new_vel # Update position with new velocity
|
||||||
|
|
||||||
|
# Calculate the orientation to face the target
|
||||||
|
angle_to_target = np.arctan2(direction[1], direction[0])
|
||||||
|
qpos[2] = angle_to_target # Set the orientation to face the target
|
||||||
|
|
||||||
|
qpos = np.concatenate([qpos[:3], self.data.qpos[3:]]) # Ensure qpos has the correct shape
|
||||||
|
qvel = np.concatenate([new_vel, self.data.qvel[2:]]) # Ensure qvel has the correct shape
|
||||||
|
self.set_state(qpos, qvel)
|
||||||
|
|
||||||
|
for _ in range(0, self.frame_skip):
|
||||||
|
mujoco.mj_step(self.model, self.data)
|
||||||
|
|
||||||
|
# Update the position of the target site
|
||||||
|
self.data.site_xpos[-1][:2] = direction + np.array([-10, 3]) # Assuming 'target_site' is the first site added
|
||||||
|
self.data.site_xpos[-1][2] = 0.1 # Slightly higher in space
|
||||||
|
|
||||||
|
next_obs = self._get_obs()
|
||||||
|
reward = -1*min(max(0, distance - 3), 100)*self.action_penalty / 1000
|
||||||
|
return next_obs, reward, False, {}
|
||||||
|
|
||||||
|
def _calculate_maze_size(self) -> float:
|
||||||
|
"""Calculate the effective size of the maze."""
|
||||||
|
maze_structure = self.maze_env._maze_structure
|
||||||
|
num_rows = len(maze_structure)
|
||||||
|
num_cols = len(maze_structure[0]) if num_rows > 0 else 0
|
||||||
|
maze_size_scaling = self.maze_env._maze_size_scaling
|
||||||
|
maze_size = max(num_rows, num_cols) * maze_size_scaling
|
||||||
|
return maze_size
|
||||||
|
|
||||||
|
def _get_obs(self):
|
||||||
|
return np.concatenate(
|
||||||
|
[
|
||||||
|
self.data.qpos.flat[:2], # current position
|
||||||
|
self.data.qvel.flat[:2], # current velocity
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def reset_model(self):
|
||||||
|
qpos = self.init_qpos + self.np_random.uniform(size=self.model.nq, low=-0.1, high=0.1)
|
||||||
|
qvel = self.init_qvel + self.np_random.random(self.model.nv) * 0.1
|
||||||
|
|
||||||
|
# Set everything other than point to original position and 0 velocity.
|
||||||
|
qpos[2:] = self.init_qpos[2:]
|
||||||
|
qvel[2:] = 0.0
|
||||||
|
self.set_state(qpos, qvel)
|
||||||
|
return self._get_obs()
|
||||||
|
|
||||||
|
def get_xy(self):
|
||||||
|
return self.data.qpos[:2].copy()
|
||||||
|
|
||||||
|
def set_xy(self, xy: np.ndarray) -> None:
|
||||||
|
qpos = self.data.qpos.copy()
|
||||||
|
qpos[:2] = xy
|
||||||
|
self.set_state(qpos, self.data.qvel)
|
@ -238,6 +238,7 @@ class MazeEnv(gym.Env):
|
|||||||
pos="2 0 0", # Initial position will be updated during the simulation
|
pos="2 0 0", # Initial position will be updated during the simulation
|
||||||
size=f"{target_size} {target_size} {target_size}",
|
size=f"{target_size} {target_size} {target_size}",
|
||||||
rgba="0 0 1 0.5", # Blue and semi-transparent
|
rgba="0 0 1 0.5", # Blue and semi-transparent
|
||||||
|
# rgba="0 0 0 0", # hide
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
Loading…
Reference in New Issue
Block a user