diff --git a/mujoco_maze/agent_model.py b/mujoco_maze/agent_model.py index 7117acf..57c7de5 100644 --- a/mujoco_maze/agent_model.py +++ b/mujoco_maze/agent_model.py @@ -11,6 +11,7 @@ class AgentModel(ABC, MujocoEnv, EzPickle): FILE: str ORI_IND: int MANUAL_COLLISION: bool + RADIUS: float def __init__(self, file_path: str, frame_skip: int) -> None: MujocoEnv.__init__(self, file_path, frame_skip) @@ -37,8 +38,3 @@ class AgentModel(ABC, MujocoEnv, EzPickle): @abstractmethod def get_ori(self) -> float: pass - - def set_collision(self, xy: np.ndarray, restitution_coef: float) -> None: - """Set the coordinate of the agent and set v' = -kv. - """ - raise NotImplementedError(f"{type(self)} does not support manual collision") diff --git a/mujoco_maze/ant.py b/mujoco_maze/ant.py index 02c4227..942fb4c 100644 --- a/mujoco_maze/ant.py +++ b/mujoco_maze/ant.py @@ -40,6 +40,7 @@ class AntEnv(AgentModel): FILE: str = "ant.xml" ORI_IND: int = 3 MANUAL_COLLISION: bool = False + RADIUS: float = 0.2 def __init__( self, @@ -49,7 +50,6 @@ class AntEnv(AgentModel): ) -> None: self._ctrl_cost_weight = ctrl_cost_weight self._forward_reward_fn = forward_reward_fn - self.radius = 0.3 super().__init__(file_path, 5) def _forward_reward(self, xy_pos_before: np.ndarray) -> Tuple[float, np.ndarray]: diff --git a/mujoco_maze/maze_env.py b/mujoco_maze/maze_env.py index 5660a13..835010e 100644 --- a/mujoco_maze/maze_env.py +++ b/mujoco_maze/maze_env.py @@ -63,8 +63,8 @@ class MazeEnv(gym.Env): (x - torso_x, y - torso_y) for x, y in self._find_all_robots() ] - self._collision = maze_env_utils.Collision( - structure, size_scaling, torso_x, torso_y, + self._collision = maze_env_utils.CollisionDetector( + structure, size_scaling, torso_x, torso_y, model_cls.RADIUS, ) self._xy_to_rowcol = lambda x, y: ( @@ -437,20 +437,14 @@ class MazeEnv(gym.Env): inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action) new_pos = self.wrapped_env.get_xy() # Checks that the new_position is in the wall - intersection = self._collision.detect_intersection( - old_pos, new_pos, self.wrapped_env.radius, - ) - if intersection is not None: - pos = intersection + (intersection - new_pos) * self._restitution_coef - intersection2 = self._collision.detect_intersection( - old_pos, pos, self.wrapped_env.radius, - ) - # If pos is also not in the wall, we give up computing the position - if intersection2 is not None: - pos = old_pos - self.wrapped_env.set_collision(pos, self._restitution_coef) - if self._debug: - print(f"new_pos: {new_pos}, pos: {pos}") + collision = self._collision.detect(old_pos, new_pos) + if collision is not None: + pos = collision.point + self._restitution_coef * collision.rest() + if self._collision.detect(old_pos, pos) is not None: + # If pos is also not in the wall, we give up computing the position + self.wrapped_env.set_xy(old_pos) + else: + self.wrapped_env.set_xy(pos) else: inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action) next_obs = self._get_obs() diff --git a/mujoco_maze/maze_env_utils.py b/mujoco_maze/maze_env_utils.py index 855d735..73619cd 100644 --- a/mujoco_maze/maze_env_utils.py +++ b/mujoco_maze/maze_env_utils.py @@ -8,7 +8,7 @@ Based on `models`_ and `rllab`_. import itertools as it from enum import Enum -from typing import Any, List, Optional, Sequence, Tuple, Union +from typing import Any, List, NamedTuple, Optional, Sequence, Tuple, Union import numpy as np @@ -78,33 +78,36 @@ class MazeCell(Enum): class Line: def __init__( - self, p1: Union[Point, Sequence[float]], p2: Union[Point, Sequence[float]] + self, p1: Union[Sequence[float], Point], p2: Union[Sequence[float], Point], ) -> None: - if isinstance(p1, Point): - self.p1 = p1 - else: - self.p1 = np.complex(*p1) - if isinstance(p2, Point): - self.p2 = p2 - else: - self.p2 = np.complex(*p2) - self.conj_v1 = np.conjugate(self.p2 - self.p1) + self.p1 = p1 if isinstance(p1, Point) else np.complex(*p1) + self.p2 = p2 if isinstance(p2, Point) else np.complex(*p2) + self.v1 = self.p2 - self.p1 + self.conj_v1 = np.conjugate(self.v1) - def extend(self, dist: float) -> Tuple[Self, Point]: - v = self.p2 - self.p1 - extended_v = v * dist / np.absolute(v) - p2 = self.p2 + extended_v - return Line(self.p1, p2), extended_v + if np.absolute(self.v1) <= 1e-8: + raise ValueError(f"p1({p1}) and p2({p2}) are too close") def _intersect(self, other: Self) -> bool: v2 = other.p1 - self.p1 v3 = other.p2 - self.p1 return (self.conj_v1 * v2).imag * (self.conj_v1 * v3).imag <= 0.0 - def intersect(self, other: Self) -> Optional[np.ndarray]: + def _projection(self, p: Point) -> Point: + nv1 = -self.v1 + nv1_norm = np.absolute(nv1) ** 2 + scale = np.real(np.conjugate(p - self.p1) * nv1) / nv1_norm + return self.p1 + nv1 * scale + + def reflection(self, p: Point) -> Point: + return p + 2.0 * (self._projection(p) - p) + + def distance(self, p: Point) -> float: + return np.absolute(p - self._projection(p)) + + def intersect(self, other: Self) -> Point: if self._intersect(other) and other._intersect(self): - cross = self._cross_point(other) - return np.array([cross.real, cross.imag]) + return self._cross_point(other) else: return None @@ -121,13 +124,32 @@ class Line: class Collision: + def __init__(self, point: Point, reflection: Point) -> None: + self._point = point + self._reflection = reflection + + @property + def point(self) -> np.ndarray: + return np.array([self._point.real, self._point.imag]) + + def rest(self) -> np.ndarray: + p = self._reflection - self._point + return np.array([p.real, p.imag]) + + +class CollisionDetector: """For manual collision detection. """ - + EPS: float = 0.05 NEIGHBORS: List[Tuple[int, int]] = [[0, -1], [-1, 0], [0, 1], [1, 0]] def __init__( - self, structure: list, size_scaling: float, torso_x: float, torso_y: float, + self, + structure: list, + size_scaling: float, + torso_x: float, + torso_y: float, + radius: float, ) -> None: h, w = len(structure), len(structure[0]) self.lines = [] @@ -143,7 +165,7 @@ class Collision: continue y_base = i * size_scaling - torso_y x_base = j * size_scaling - torso_x - offset = size_scaling * 0.5 + offset = size_scaling * 0.5 + radius min_y, max_y = y_base - offset, y_base + offset min_x, max_x = x_base - offset, x_base + offset for dx, dy in self.NEIGHBORS: @@ -156,21 +178,21 @@ class Collision: ) ) - def detect_intersection( - self, old_pos: np.ndarray, new_pos: np.ndarray, radius - ) -> Optional[np.ndarray]: - move, extended = Line(old_pos, new_pos).extend(radius) - intersections = [] + def detect(self, old_pos: np.ndarray, new_pos: np.ndarray) -> Optional[Collision]: + move = Line(old_pos, new_pos) + # Next, checks that the trajectory cross the wall or not + collisions = [] for line in self.lines: intersection = line.intersect(move) if intersection is not None: - intersections.append(intersection) - if len(intersections) == 0: + reflection = line.reflection(move.p2) + collisions.append(Collision(intersection, reflection)) + if len(collisions) == 0: return None - pos = intersections[0] - dist = np.linalg.norm(pos - old_pos) - for new_pos in intersections[1:]: - new_dist = np.linalg.norm(new_pos - old_pos) + col = collisions[0] + dist = np.absolute(col._point - move.p1) + for collision in collisions[1:]: + new_dist = np.absolute(collision._point - move.p1) if new_dist < dist: - pos, dist = new_pos, new_dist - return pos - np.array([extended.real, extended.imag]) + col, dist = collision, new_dist + return col diff --git a/mujoco_maze/point.py b/mujoco_maze/point.py index dfb5087..9b38e2d 100644 --- a/mujoco_maze/point.py +++ b/mujoco_maze/point.py @@ -19,7 +19,7 @@ class PointEnv(AgentModel): FILE: str = "point.xml" ORI_IND: int = 2 MANUAL_COLLISION: bool = True - radius: float = 0.5 + RADIUS: float = 0.4 VELOCITY_LIMITS: float = 10.0 @@ -29,7 +29,6 @@ class PointEnv(AgentModel): high[3:] = self.VELOCITY_LIMITS * 1.2 high[self.ORI_IND] = np.pi low = -high - self.radius = 0.5 self.observation_space = gym.spaces.Box(low, high) def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]: @@ -79,12 +78,5 @@ class PointEnv(AgentModel): qpos[:2] = xy self.set_state(qpos, self.sim.data.qvel) - def set_collision(self, xy: np.ndarray, restitution_coef: float) -> None: - qpos = self.sim.data.qpos.copy() - qpos[:2] = xy - qvel = self.sim.data.qvel.copy() - qvel[:2] *= -restitution_coef - self.set_state(qpos, qvel) - def get_ori(self): return self.sim.data.qpos[self.ORI_IND] diff --git a/tests/test_envs.py b/tests/test_envs.py index e45448c..469ab1a 100644 --- a/tests/test_envs.py +++ b/tests/test_envs.py @@ -1,5 +1,4 @@ import gym -import numpy as np import pytest import mujoco_maze @@ -21,29 +20,3 @@ def test_point_maze(maze_id): assert env.reset().shape == (7,) s, _, _, _ = env.step(env.action_space.sample()) assert s.shape == (7,) - - -@pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys()) -def test_collision_lines(maze_id): - env = gym.make(f"Point{maze_id}-v0") - if maze_id == "UMaze": - assert len(env.unwrapped._collision.lines) == 16 - structure = env.unwrapped._maze_structure - scaling = env.unwrapped._maze_size_scaling - init_x = env.unwrapped._init_torso_x - init_y = env.unwrapped._init_torso_y - - def check_pos(pos): - x_orig = (pos.real + init_x) / scaling - y_orig = (pos.imag + init_y) / scaling - return structure[int(round(y_orig))][int(round(x_orig))] - - for line in env.unwrapped._collision.lines: - mid = (line.p1 + line.p2) / 2 - p2p1 = line.p2 - line.p1 - cell1 = check_pos(mid + 0.1 * p2p1 * np.complex(0.0, -1.0)) - cell2 = check_pos(mid + 0.1 * p2p1 * np.complex(0.0, 1.0)) - if cell1.is_block(): - assert not cell2.is_block() - else: - assert cell2.is_block() diff --git a/tests/test_intersect.py b/tests/test_intersect.py index 7f337ec..2378126 100644 --- a/tests/test_intersect.py +++ b/tests/test_intersect.py @@ -4,6 +4,19 @@ import pytest from mujoco_maze.maze_env_utils import Line +@pytest.mark.parametrize( + "l1, l2, p, ans", + [ + ((0.0, 0.0), (4.0, 4.0), (1.0, 3.0), 2.0 ** 0.5), + ((-3.0, -3.0), (0.0, 1.0), (-3.0, 1.0), 2.4), + ], +) +def test_distance(l1, l2, p, ans): + line = Line(l1, l2) + point = np.complex(*p) + assert abs(line.distance(point) - ans) <= 1e-8 + + @pytest.mark.parametrize( "l1p1, l1p2, l2p1, l2p2, none", [ @@ -22,6 +35,7 @@ def test_intersect(l1p1, l1p2, l2p1, l2p2, none): assert i1 is None and i2 is None else: assert i1 is not None + i1 = np.array([i1.real, i1.imag]) np.testing.assert_array_almost_equal(i1, np.array(i2))