Use offset instead of extending the line
This commit is contained in:
parent
8907d0a2c0
commit
b6ef0fa028
@ -11,6 +11,7 @@ class AgentModel(ABC, MujocoEnv, EzPickle):
|
||||
FILE: str
|
||||
ORI_IND: int
|
||||
MANUAL_COLLISION: bool
|
||||
RADIUS: float
|
||||
|
||||
def __init__(self, file_path: str, frame_skip: int) -> None:
|
||||
MujocoEnv.__init__(self, file_path, frame_skip)
|
||||
@ -37,8 +38,3 @@ class AgentModel(ABC, MujocoEnv, EzPickle):
|
||||
@abstractmethod
|
||||
def get_ori(self) -> float:
|
||||
pass
|
||||
|
||||
def set_collision(self, xy: np.ndarray, restitution_coef: float) -> None:
|
||||
"""Set the coordinate of the agent and set v' = -kv.
|
||||
"""
|
||||
raise NotImplementedError(f"{type(self)} does not support manual collision")
|
||||
|
@ -40,6 +40,7 @@ class AntEnv(AgentModel):
|
||||
FILE: str = "ant.xml"
|
||||
ORI_IND: int = 3
|
||||
MANUAL_COLLISION: bool = False
|
||||
RADIUS: float = 0.2
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -49,7 +50,6 @@ class AntEnv(AgentModel):
|
||||
) -> None:
|
||||
self._ctrl_cost_weight = ctrl_cost_weight
|
||||
self._forward_reward_fn = forward_reward_fn
|
||||
self.radius = 0.3
|
||||
super().__init__(file_path, 5)
|
||||
|
||||
def _forward_reward(self, xy_pos_before: np.ndarray) -> Tuple[float, np.ndarray]:
|
||||
|
@ -63,8 +63,8 @@ class MazeEnv(gym.Env):
|
||||
(x - torso_x, y - torso_y) for x, y in self._find_all_robots()
|
||||
]
|
||||
|
||||
self._collision = maze_env_utils.Collision(
|
||||
structure, size_scaling, torso_x, torso_y,
|
||||
self._collision = maze_env_utils.CollisionDetector(
|
||||
structure, size_scaling, torso_x, torso_y, model_cls.RADIUS,
|
||||
)
|
||||
|
||||
self._xy_to_rowcol = lambda x, y: (
|
||||
@ -437,20 +437,14 @@ class MazeEnv(gym.Env):
|
||||
inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action)
|
||||
new_pos = self.wrapped_env.get_xy()
|
||||
# Checks that the new_position is in the wall
|
||||
intersection = self._collision.detect_intersection(
|
||||
old_pos, new_pos, self.wrapped_env.radius,
|
||||
)
|
||||
if intersection is not None:
|
||||
pos = intersection + (intersection - new_pos) * self._restitution_coef
|
||||
intersection2 = self._collision.detect_intersection(
|
||||
old_pos, pos, self.wrapped_env.radius,
|
||||
)
|
||||
collision = self._collision.detect(old_pos, new_pos)
|
||||
if collision is not None:
|
||||
pos = collision.point + self._restitution_coef * collision.rest()
|
||||
if self._collision.detect(old_pos, pos) is not None:
|
||||
# If pos is also not in the wall, we give up computing the position
|
||||
if intersection2 is not None:
|
||||
pos = old_pos
|
||||
self.wrapped_env.set_collision(pos, self._restitution_coef)
|
||||
if self._debug:
|
||||
print(f"new_pos: {new_pos}, pos: {pos}")
|
||||
self.wrapped_env.set_xy(old_pos)
|
||||
else:
|
||||
self.wrapped_env.set_xy(pos)
|
||||
else:
|
||||
inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action)
|
||||
next_obs = self._get_obs()
|
||||
|
@ -8,7 +8,7 @@ Based on `models`_ and `rllab`_.
|
||||
|
||||
import itertools as it
|
||||
from enum import Enum
|
||||
from typing import Any, List, Optional, Sequence, Tuple, Union
|
||||
from typing import Any, List, NamedTuple, Optional, Sequence, Tuple, Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
@ -78,33 +78,36 @@ class MazeCell(Enum):
|
||||
|
||||
class Line:
|
||||
def __init__(
|
||||
self, p1: Union[Point, Sequence[float]], p2: Union[Point, Sequence[float]]
|
||||
self, p1: Union[Sequence[float], Point], p2: Union[Sequence[float], Point],
|
||||
) -> None:
|
||||
if isinstance(p1, Point):
|
||||
self.p1 = p1
|
||||
else:
|
||||
self.p1 = np.complex(*p1)
|
||||
if isinstance(p2, Point):
|
||||
self.p2 = p2
|
||||
else:
|
||||
self.p2 = np.complex(*p2)
|
||||
self.conj_v1 = np.conjugate(self.p2 - self.p1)
|
||||
self.p1 = p1 if isinstance(p1, Point) else np.complex(*p1)
|
||||
self.p2 = p2 if isinstance(p2, Point) else np.complex(*p2)
|
||||
self.v1 = self.p2 - self.p1
|
||||
self.conj_v1 = np.conjugate(self.v1)
|
||||
|
||||
def extend(self, dist: float) -> Tuple[Self, Point]:
|
||||
v = self.p2 - self.p1
|
||||
extended_v = v * dist / np.absolute(v)
|
||||
p2 = self.p2 + extended_v
|
||||
return Line(self.p1, p2), extended_v
|
||||
if np.absolute(self.v1) <= 1e-8:
|
||||
raise ValueError(f"p1({p1}) and p2({p2}) are too close")
|
||||
|
||||
def _intersect(self, other: Self) -> bool:
|
||||
v2 = other.p1 - self.p1
|
||||
v3 = other.p2 - self.p1
|
||||
return (self.conj_v1 * v2).imag * (self.conj_v1 * v3).imag <= 0.0
|
||||
|
||||
def intersect(self, other: Self) -> Optional[np.ndarray]:
|
||||
def _projection(self, p: Point) -> Point:
|
||||
nv1 = -self.v1
|
||||
nv1_norm = np.absolute(nv1) ** 2
|
||||
scale = np.real(np.conjugate(p - self.p1) * nv1) / nv1_norm
|
||||
return self.p1 + nv1 * scale
|
||||
|
||||
def reflection(self, p: Point) -> Point:
|
||||
return p + 2.0 * (self._projection(p) - p)
|
||||
|
||||
def distance(self, p: Point) -> float:
|
||||
return np.absolute(p - self._projection(p))
|
||||
|
||||
def intersect(self, other: Self) -> Point:
|
||||
if self._intersect(other) and other._intersect(self):
|
||||
cross = self._cross_point(other)
|
||||
return np.array([cross.real, cross.imag])
|
||||
return self._cross_point(other)
|
||||
else:
|
||||
return None
|
||||
|
||||
@ -121,13 +124,32 @@ class Line:
|
||||
|
||||
|
||||
class Collision:
|
||||
def __init__(self, point: Point, reflection: Point) -> None:
|
||||
self._point = point
|
||||
self._reflection = reflection
|
||||
|
||||
@property
|
||||
def point(self) -> np.ndarray:
|
||||
return np.array([self._point.real, self._point.imag])
|
||||
|
||||
def rest(self) -> np.ndarray:
|
||||
p = self._reflection - self._point
|
||||
return np.array([p.real, p.imag])
|
||||
|
||||
|
||||
class CollisionDetector:
|
||||
"""For manual collision detection.
|
||||
"""
|
||||
|
||||
EPS: float = 0.05
|
||||
NEIGHBORS: List[Tuple[int, int]] = [[0, -1], [-1, 0], [0, 1], [1, 0]]
|
||||
|
||||
def __init__(
|
||||
self, structure: list, size_scaling: float, torso_x: float, torso_y: float,
|
||||
self,
|
||||
structure: list,
|
||||
size_scaling: float,
|
||||
torso_x: float,
|
||||
torso_y: float,
|
||||
radius: float,
|
||||
) -> None:
|
||||
h, w = len(structure), len(structure[0])
|
||||
self.lines = []
|
||||
@ -143,7 +165,7 @@ class Collision:
|
||||
continue
|
||||
y_base = i * size_scaling - torso_y
|
||||
x_base = j * size_scaling - torso_x
|
||||
offset = size_scaling * 0.5
|
||||
offset = size_scaling * 0.5 + radius
|
||||
min_y, max_y = y_base - offset, y_base + offset
|
||||
min_x, max_x = x_base - offset, x_base + offset
|
||||
for dx, dy in self.NEIGHBORS:
|
||||
@ -156,21 +178,21 @@ class Collision:
|
||||
)
|
||||
)
|
||||
|
||||
def detect_intersection(
|
||||
self, old_pos: np.ndarray, new_pos: np.ndarray, radius
|
||||
) -> Optional[np.ndarray]:
|
||||
move, extended = Line(old_pos, new_pos).extend(radius)
|
||||
intersections = []
|
||||
def detect(self, old_pos: np.ndarray, new_pos: np.ndarray) -> Optional[Collision]:
|
||||
move = Line(old_pos, new_pos)
|
||||
# Next, checks that the trajectory cross the wall or not
|
||||
collisions = []
|
||||
for line in self.lines:
|
||||
intersection = line.intersect(move)
|
||||
if intersection is not None:
|
||||
intersections.append(intersection)
|
||||
if len(intersections) == 0:
|
||||
reflection = line.reflection(move.p2)
|
||||
collisions.append(Collision(intersection, reflection))
|
||||
if len(collisions) == 0:
|
||||
return None
|
||||
pos = intersections[0]
|
||||
dist = np.linalg.norm(pos - old_pos)
|
||||
for new_pos in intersections[1:]:
|
||||
new_dist = np.linalg.norm(new_pos - old_pos)
|
||||
col = collisions[0]
|
||||
dist = np.absolute(col._point - move.p1)
|
||||
for collision in collisions[1:]:
|
||||
new_dist = np.absolute(collision._point - move.p1)
|
||||
if new_dist < dist:
|
||||
pos, dist = new_pos, new_dist
|
||||
return pos - np.array([extended.real, extended.imag])
|
||||
col, dist = collision, new_dist
|
||||
return col
|
||||
|
@ -19,7 +19,7 @@ class PointEnv(AgentModel):
|
||||
FILE: str = "point.xml"
|
||||
ORI_IND: int = 2
|
||||
MANUAL_COLLISION: bool = True
|
||||
radius: float = 0.5
|
||||
RADIUS: float = 0.4
|
||||
|
||||
VELOCITY_LIMITS: float = 10.0
|
||||
|
||||
@ -29,7 +29,6 @@ class PointEnv(AgentModel):
|
||||
high[3:] = self.VELOCITY_LIMITS * 1.2
|
||||
high[self.ORI_IND] = np.pi
|
||||
low = -high
|
||||
self.radius = 0.5
|
||||
self.observation_space = gym.spaces.Box(low, high)
|
||||
|
||||
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
|
||||
@ -79,12 +78,5 @@ class PointEnv(AgentModel):
|
||||
qpos[:2] = xy
|
||||
self.set_state(qpos, self.sim.data.qvel)
|
||||
|
||||
def set_collision(self, xy: np.ndarray, restitution_coef: float) -> None:
|
||||
qpos = self.sim.data.qpos.copy()
|
||||
qpos[:2] = xy
|
||||
qvel = self.sim.data.qvel.copy()
|
||||
qvel[:2] *= -restitution_coef
|
||||
self.set_state(qpos, qvel)
|
||||
|
||||
def get_ori(self):
|
||||
return self.sim.data.qpos[self.ORI_IND]
|
||||
|
@ -1,5 +1,4 @@
|
||||
import gym
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
import mujoco_maze
|
||||
@ -21,29 +20,3 @@ def test_point_maze(maze_id):
|
||||
assert env.reset().shape == (7,)
|
||||
s, _, _, _ = env.step(env.action_space.sample())
|
||||
assert s.shape == (7,)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys())
|
||||
def test_collision_lines(maze_id):
|
||||
env = gym.make(f"Point{maze_id}-v0")
|
||||
if maze_id == "UMaze":
|
||||
assert len(env.unwrapped._collision.lines) == 16
|
||||
structure = env.unwrapped._maze_structure
|
||||
scaling = env.unwrapped._maze_size_scaling
|
||||
init_x = env.unwrapped._init_torso_x
|
||||
init_y = env.unwrapped._init_torso_y
|
||||
|
||||
def check_pos(pos):
|
||||
x_orig = (pos.real + init_x) / scaling
|
||||
y_orig = (pos.imag + init_y) / scaling
|
||||
return structure[int(round(y_orig))][int(round(x_orig))]
|
||||
|
||||
for line in env.unwrapped._collision.lines:
|
||||
mid = (line.p1 + line.p2) / 2
|
||||
p2p1 = line.p2 - line.p1
|
||||
cell1 = check_pos(mid + 0.1 * p2p1 * np.complex(0.0, -1.0))
|
||||
cell2 = check_pos(mid + 0.1 * p2p1 * np.complex(0.0, 1.0))
|
||||
if cell1.is_block():
|
||||
assert not cell2.is_block()
|
||||
else:
|
||||
assert cell2.is_block()
|
||||
|
@ -4,6 +4,19 @@ import pytest
|
||||
from mujoco_maze.maze_env_utils import Line
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"l1, l2, p, ans",
|
||||
[
|
||||
((0.0, 0.0), (4.0, 4.0), (1.0, 3.0), 2.0 ** 0.5),
|
||||
((-3.0, -3.0), (0.0, 1.0), (-3.0, 1.0), 2.4),
|
||||
],
|
||||
)
|
||||
def test_distance(l1, l2, p, ans):
|
||||
line = Line(l1, l2)
|
||||
point = np.complex(*p)
|
||||
assert abs(line.distance(point) - ans) <= 1e-8
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"l1p1, l1p2, l2p1, l2p2, none",
|
||||
[
|
||||
@ -22,6 +35,7 @@ def test_intersect(l1p1, l1p2, l2p1, l2p2, none):
|
||||
assert i1 is None and i2 is None
|
||||
else:
|
||||
assert i1 is not None
|
||||
i1 = np.array([i1.real, i1.imag])
|
||||
np.testing.assert_array_almost_equal(i1, np.array(i2))
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user