From c3ab9c954588cd3d84cdc2e4b2db1ad81ebace85 Mon Sep 17 00:00:00 2001 From: kngwyu Date: Mon, 6 Jul 2020 23:16:34 +0900 Subject: [PATCH] Add restitution in manual collision detection --- mujoco_maze/__init__.py | 3 +- mujoco_maze/agent_model.py | 5 ++ mujoco_maze/ant.py | 8 +- mujoco_maze/maze_env.py | 131 +++++------------------------ mujoco_maze/maze_env_utils.py | 154 +++++++++++++++------------------- mujoco_maze/point.py | 22 +++-- tests/test_envs.py | 45 ++++++++-- tests/test_intersect.py | 70 ++++++++++++++++ 8 files changed, 216 insertions(+), 222 deletions(-) create mode 100644 tests/test_intersect.py diff --git a/mujoco_maze/__init__.py b/mujoco_maze/__init__.py index aa9085d..9acacab 100644 --- a/mujoco_maze/__init__.py +++ b/mujoco_maze/__init__.py @@ -9,9 +9,8 @@ A maze environment using mujoco that supports custom tasks and robots. import gym from mujoco_maze.ant import AntEnv -from mujoco_maze.point import PointEnv from mujoco_maze.maze_task import TaskRegistry - +from mujoco_maze.point import PointEnv for maze_id in TaskRegistry.keys(): for i, task_cls in enumerate(TaskRegistry.tasks(maze_id)): diff --git a/mujoco_maze/agent_model.py b/mujoco_maze/agent_model.py index d063fd3..7117acf 100644 --- a/mujoco_maze/agent_model.py +++ b/mujoco_maze/agent_model.py @@ -37,3 +37,8 @@ class AgentModel(ABC, MujocoEnv, EzPickle): @abstractmethod def get_ori(self) -> float: pass + + def set_collision(self, xy: np.ndarray, restitution_coef: float) -> None: + """Set the coordinate of the agent and set v' = -kv. + """ + raise NotImplementedError(f"{type(self)} does not support manual collision") diff --git a/mujoco_maze/ant.py b/mujoco_maze/ant.py index 7dc4d96..9915c72 100644 --- a/mujoco_maze/ant.py +++ b/mujoco_maze/ant.py @@ -13,7 +13,6 @@ import numpy as np from mujoco_maze.agent_model import AgentModel - ForwardRewardFn = Callable[[float, float], float] @@ -103,11 +102,8 @@ class AntEnv(AgentModel): def set_xy(self, xy): qpos = self.sim.data.qpos.copy() - qpos[0] = xy[0] - qpos[1] = xy[1] - - qvel = self.sim.data.qvel - self.set_state(qpos, qvel) + qpos[:2] = xy + self.set_state(qpos, self.sim.data.qvel) def get_xy(self): return np.copy(self.sim.data.qpos[:2]) diff --git a/mujoco_maze/maze_env.py b/mujoco_maze/maze_env.py index d392516..1899ea2 100644 --- a/mujoco_maze/maze_env.py +++ b/mujoco_maze/maze_env.py @@ -10,7 +10,7 @@ import itertools as it import os import tempfile import xml.etree.ElementTree as ET -from typing import Tuple, Type +from typing import List, Tuple, Type import gym import numpy as np @@ -27,13 +27,12 @@ class MazeEnv(gym.Env): self, model_cls: Type[AgentModel], maze_task: Type[maze_task.MazeTask] = maze_task.MazeTask, - n_bins: int = 0, - sensor_range: float = 3.0, - sensor_span: float = 2 * np.pi, top_down_view: float = False, maze_height: float = 0.5, maze_size_scaling: float = 4.0, inner_reward_scaling: float = 1.0, + restitution_coef: float = 0.6, + collision_penalty: float = 0.001, *args, **kwargs, ) -> None: @@ -47,13 +46,11 @@ class MazeEnv(gym.Env): self._maze_size_scaling = size_scaling = maze_size_scaling self._inner_reward_scaling = inner_reward_scaling self.t = 0 # time steps - self._n_bins = n_bins - self._sensor_range = sensor_range * size_scaling - self._sensor_span = sensor_span self._observe_blocks = self._task.OBSERVE_BLOCKS self._put_spin_near_agent = self._task.PUT_SPIN_NEAR_AGENT self._top_down_view = top_down_view - self._collision_coef = 0.1 + self._restitution_coef = restitution_coef + self._collision_penalty = collision_penalty self._maze_structure = structure = self._task.create_maze() # Elevate the maze to allow for falling. @@ -282,7 +279,7 @@ class MazeEnv(gym.Env): ymin, ymax = (ymin - 0.5) * scaling - y0, (ymax + 0.5) * scaling - y0 return xmin, xmax, ymin, ymax - def get_top_down_view(self): + def get_top_down_view(self) -> np.ndarray: self._view = np.zeros_like(self._view) def valid(row, col): @@ -372,98 +369,7 @@ class MazeEnv(gym.Env): return self._view - def get_range_sensor_obs(self): - """Returns egocentric range sensor observations of maze.""" - robot_x, robot_y, robot_z = self.wrapped_env.get_body_com("torso")[:3] - ori = self.get_ori() - - structure = self._maze_structure - size_scaling = self._maze_size_scaling - height = self._maze_height - - segments = [] - # Get line segments (corresponding to outer boundary) of each immovable - # block or drop-off. - for i in range(len(structure)): - for j in range(len(structure[0])): - if structure[i][j].is_wall_or_chasm(): # There's a wall or drop-off. - cx = j * size_scaling - self._init_torso_x - cy = i * size_scaling - self._init_torso_y - x1 = cx - 0.5 * size_scaling - x2 = cx + 0.5 * size_scaling - y1 = cy - 0.5 * size_scaling - y2 = cy + 0.5 * size_scaling - struct_segments = [ - ((x1, y1), (x2, y1)), - ((x2, y1), (x2, y2)), - ((x2, y2), (x1, y2)), - ((x1, y2), (x1, y1)), - ] - for seg in struct_segments: - segments.append(dict(segment=seg, type=structure[i][j],)) - # Get line segments (corresponding to outer boundary) of each movable - # block within the agent's z-view. - for block_name, block_type in self.movable_blocks: - block_x, block_y, block_z = self.wrapped_env.get_body_com(block_name)[:3] - if ( - block_z + height * size_scaling / 2 >= robot_z - and robot_z >= block_z - height * size_scaling / 2 - ): # Block in view. - x1 = block_x - 0.5 * size_scaling - x2 = block_x + 0.5 * size_scaling - y1 = block_y - 0.5 * size_scaling - y2 = block_y + 0.5 * size_scaling - struct_segments = [ - ((x1, y1), (x2, y1)), - ((x2, y1), (x2, y2)), - ((x2, y2), (x1, y2)), - ((x1, y2), (x1, y1)), - ] - for seg in struct_segments: - segments.append(dict(segment=seg, type=block_type)) - - sensor_readings = np.zeros((self._n_bins, 3)) # 3 for wall, drop-off, block - for ray_idx in range(self._n_bins): - ray_ori = ( - ori - - self._sensor_span * 0.5 - + (2 * ray_idx + 1.0) / (2 * self._n_bins) * self._sensor_span - ) - ray_segments = [] - # Get all segments that intersect with ray. - for seg in segments: - p = maze_env_utils.ray_segment_intersect( - ray=((robot_x, robot_y), ray_ori), segment=seg["segment"] - ) - if p is not None: - ray_segments.append( - dict( - segment=seg["segment"], - type=seg["type"], - ray_ori=ray_ori, - distance=maze_env_utils.point_distance( - p, (robot_x, robot_y) - ), - ) - ) - if len(ray_segments) > 0: - # Find out which segment is intersected first. - first_seg = sorted(ray_segments, key=lambda x: x["distance"])[0] - seg_type = first_seg["type"] - idx = None - if seg_type == 1: - idx = 0 # Wall - elif seg_type == -1: - idx = 1 # Drop-off - elif seg_type.can_move(): - idx == 2 # Block - sr = self._sensor_range - if first_seg["distance"] <= sr: - sensor_readings[ray_idx][idx] = (sr - first_seg["distance"]) / sr - - return sensor_readings - - def _get_obs(self): + def _get_obs(self) -> np.ndarray: wrapped_obs = self.wrapped_env._get_obs() if self._top_down_view: view = [self.get_top_down_view().flat] @@ -478,12 +384,9 @@ class MazeEnv(gym.Env): [wrapped_obs[:3]] + additional_obs + [wrapped_obs[3:]] ) - range_sensor_obs = self.get_range_sensor_obs() - return np.concatenate( - [wrapped_obs, range_sensor_obs.flat] + view + [[self.t * 0.001]] - ) + return np.concatenate([wrapped_obs, *view, np.array([self.t * 0.001])]) - def reset(self): + def reset(self) -> np.ndarray: self.t = 0 self.wrapped_env.reset() # Samples a new goal @@ -495,7 +398,7 @@ class MazeEnv(gym.Env): self.wrapped_env.set_xy(xy) return self._get_obs() - def set_marker(self): + def set_marker(self) -> None: for i, goal in enumerate(self._task.goals): idx = self.model.site_name2id(f"goal{i}") self.data.site_xpos[idx][: len(goal.pos)] = goal.pos @@ -511,7 +414,7 @@ class MazeEnv(gym.Env): def action_space(self): return self.wrapped_env.action_space - def _find_robot(self): + def _find_robot(self) -> Tuple[float, float]: structure = self._maze_structure size_scaling = self._maze_size_scaling for i, j in it.product(range(len(structure)), range(len(structure[0]))): @@ -519,7 +422,7 @@ class MazeEnv(gym.Env): return j * size_scaling, i * size_scaling raise ValueError("No robot in maze specification.") - def _find_all_robots(self): + def _find_all_robots(self) -> List[Tuple[float, float]]: structure = self._maze_structure size_scaling = self._maze_size_scaling coords = [] @@ -528,14 +431,18 @@ class MazeEnv(gym.Env): coords.append((j * size_scaling, i * size_scaling)) return coords - def step(self, action): + def step(self, action) -> Tuple[np.ndarray, float, bool, dict]: self.t += 1 if self.wrapped_env.MANUAL_COLLISION: old_pos = self.wrapped_env.get_xy() inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action) new_pos = self.wrapped_env.get_xy() - if self._collision.is_in(old_pos, new_pos): - self.wrapped_env.set_xy(old_pos) + intersection = self._collision.detect_intersection(old_pos, new_pos) + if intersection is not None: + rest_vec = -self._restitution_coef * (new_pos - intersection) + pos = old_pos + rest_vec + self.wrapped_env.set_collision(pos, self._restitution_coef) + inner_reward -= self._collision_penalty else: inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action) next_obs = self._get_obs() diff --git a/mujoco_maze/maze_env_utils.py b/mujoco_maze/maze_env_utils.py index 3e432bd..ff03006 100644 --- a/mujoco_maze/maze_env_utils.py +++ b/mujoco_maze/maze_env_utils.py @@ -7,11 +7,14 @@ Based on `models`_ and `rllab`_. """ import itertools as it -import math from enum import Enum +from typing import Any, List, Optional, Sequence, Tuple import numpy as np +Self = Any +Point = np.complex + class MazeCell(Enum): # Robot: Start position @@ -36,6 +39,9 @@ class MazeCell(Enum): def is_chasm(self) -> bool: return self == self.CHASM + def is_empty(self) -> bool: + return self == self.ROBOT or self == self.EMPTY + def is_robot(self) -> bool: return self == self.ROBOT @@ -70,107 +76,85 @@ class MazeCell(Enum): return self.can_move_x() or self.can_move_y() or self.can_move_z() +class Line: + def __init__(self, p1: Sequence[float], p2: Sequence[float]) -> None: + self.p1 = np.complex(*p1) + self.p2 = np.complex(*p2) + self.conj_v1 = np.conjugate(self.p2 - self.p1) + + def _intersect(self, other: Self) -> bool: + v2 = other.p1 - self.p1 + v3 = other.p2 - self.p1 + return (self.conj_v1 * v2).imag * (self.conj_v1 * v3).imag <= 0.0 + + def intersect(self, other: Self) -> Optional[np.ndarray]: + if self._intersect(other) and other._intersect(self): + cross = self._cross_point(other) + return np.array([cross.real, cross.imag]) + else: + return None + + def _cross_point(self, other: Self) -> Optional[Point]: + v2 = other.p2 - other.p1 + v3 = self.p2 - other.p1 + a, b = (self.conj_v1 * v2).imag, (self.conj_v1 * v3).imag + return other.p1 + b / a * v2 + + def __repr__(self) -> str: + x1, y1 = self.p1.real, self.p1.imag + x2, y2 = self.p2.real, self.p2.imag + return f"Line(({x1}, {y1}) -> ({x2}, {y2}))" + + class Collision: """For manual collision detection. """ - ARROUND = np.array([[-1, 0], [1, 0], [0, -1], [0, 1]]) - OFFSET = {False: 0.48, True: 0.51} + NEIGHBORS: List[Tuple[int, int]] = [[0, -1], [-1, 0], [0, 1], [1, 0]] def __init__( self, structure: list, size_scaling: float, torso_x: float, torso_y: float, ) -> None: h, w = len(structure), len(structure[0]) - self.objects = [] + self.lines = [] - def is_block(pos) -> bool: - i, j = pos + def is_empty(i, j) -> bool: if 0 <= i < h and 0 <= j < w: - return structure[i][j].is_block() + return structure[i][j].is_empty() else: return False - def offset(pos, index) -> float: - return self.OFFSET[is_block(pos + self.ARROUND[index])] - for i, j in it.product(range(len(structure)), range(len(structure[0]))): if not structure[i][j].is_block(): continue - pos = np.array([i, j]) y_base = i * size_scaling - torso_y - min_y = y_base - size_scaling * offset(pos, 0) - max_y = y_base + size_scaling * offset(pos, 1) x_base = j * size_scaling - torso_x - min_x = x_base - size_scaling * offset(pos, 2) - max_x = x_base + size_scaling * offset(pos, 3) - self.objects.append((min_y, max_y, min_x, max_x)) + offset = size_scaling * 0.5 + min_y, max_y = y_base - offset, y_base + offset + min_x, max_x = x_base - offset, x_base + offset + for dx, dy in self.NEIGHBORS: + if not is_empty(i + dy, j + dx): + continue + self.lines.append(Line( + (max_x if dx == 1 else min_x, max_y if dy == 1 else min_y), + (min_x if dx == -1 else max_x, min_y if dy == -1 else max_y), + )) - def is_in(self, old_pos: np.ndarray, new_pos: np.ndarray) -> bool: - # Heuristics to prevent the agent from going through the wall - for x, y in ((old_pos + new_pos) / 2, new_pos): - for min_y, max_y, min_x, max_x in self.objects: - if min_x <= x <= max_x and min_y <= y <= max_y: - return True - return False - - -def line_intersect(pt1, pt2, ptA, ptB): - """ - Taken from https://www.cs.hmc.edu/ACM/lectures/intersections.html - Returns the intersection of Line(pt1,pt2) and Line(ptA,ptB). - """ - - DET_TOLERANCE = 0.00000001 - - # the first line is pt1 + r*(pt2-pt1) - # in component form: - x1, y1 = pt1 - x2, y2 = pt2 - dx1 = x2 - x1 - dy1 = y2 - y1 - - # the second line is ptA + s*(ptB-ptA) - x, y = ptA - xB, yB = ptB - dx = xB - x - dy = yB - y - - DET = -dx1 * dy + dy1 * dx - - if math.fabs(DET) < DET_TOLERANCE: - return (0, 0, 0, 0, 0) - - # now, the determinant should be OK - DETinv = 1.0 / DET - - # find the scalar amount along the "self" segment - r = DETinv * (-dy * (x - x1) + dx * (y - y1)) - - # find the scalar amount along the input line - s = DETinv * (-dy1 * (x - x1) + dx1 * (y - y1)) - - # return the average of the two descriptions - xi = (x1 + r * dx1 + x + s * dx) / 2.0 - yi = (y1 + r * dy1 + y + s * dy) / 2.0 - return (xi, yi, 1, r, s) - - -def ray_segment_intersect(ray, segment): - """ - Check if the ray originated from (x, y) with direction theta intersect the line - segment (x1, y1) -- (x2, y2), and return the intersection point if there is one. - """ - (x, y), theta = ray - # (x1, y1), (x2, y2) = segment - pt1 = (x, y) - pt2 = (x + math.cos(theta), y + math.sin(theta)) - xo, yo, valid, r, s = line_intersect(pt1, pt2, *segment) - if valid and r >= 0 and 0 <= s <= 1: - return (xo, yo) - return None - - -def point_distance(p1, p2): - x1, y1 = p1 - x2, y2 = p2 - return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5 + def detect_intersection( + self, old_pos: np.ndarray, new_pos: np.ndarray + ) -> Optional[np.ndarray]: + move = Line(old_pos, new_pos) + intersections = [] + for line in self.lines: + intersection = line.intersect(move) + if intersection is not None: + intersections.append(intersection) + if len(intersections) == 0: + return None + pos = intersections[0] + dist = np.linalg.norm(pos - old_pos) + for new_pos in intersections[1:]: + new_dist = np.linalg.norm(new_pos - old_pos) + if new_dist < dist: + pos, dist = new_pos, new_dist + return pos diff --git a/mujoco_maze/point.py b/mujoco_maze/point.py index 4e5b940..c5b84e8 100644 --- a/mujoco_maze/point.py +++ b/mujoco_maze/point.py @@ -20,18 +20,18 @@ class PointEnv(AgentModel): ORI_IND: int = 2 MANUAL_COLLISION: bool = True - VELOCITY_LIMITS: float = 20.0 + VELOCITY_LIMITS: float = 10.0 def __init__(self, file_path: Optional[str] = None): super().__init__(file_path, 1) high = np.inf * np.ones(6, dtype=np.float32) - high[3:] = self.VELOCITY_LIMITS + high[3:] = self.VELOCITY_LIMITS * 1.2 high[self.ORI_IND] = np.pi low = -high self.observation_space = gym.spaces.Box(low, high) def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]: - qpos = np.copy(self.sim.data.qpos) + qpos = self.sim.data.qpos.copy() qpos[2] += action[1] # Clip orientation if qpos[2] < -np.pi: @@ -70,14 +70,18 @@ class PointEnv(AgentModel): return self._get_obs() def get_xy(self): - return np.copy(self.sim.data.qpos[:2]) + return self.sim.data.qpos[:2].copy() - def set_xy(self, xy): - qpos = np.copy(self.sim.data.qpos) - qpos[0] = xy[0] - qpos[1] = xy[1] + def set_xy(self, xy: np.ndarray) -> None: + qpos = self.sim.data.qpos.copy() + qpos[:2] = xy + self.set_state(qpos, self.sim.data.qvel) - qvel = self.sim.data.qvel + def set_collision(self, xy: np.ndarray, restitution_coef: float) -> None: + qpos = self.sim.data.qpos.copy() + qpos[:2] = xy + qvel = self.sim.data.qvel.copy() + qvel[:2] = -restitution_coef * qvel[:2] self.set_state(qpos, qvel) def get_ori(self): diff --git a/tests/test_envs.py b/tests/test_envs.py index 060be3a..e45448c 100644 --- a/tests/test_envs.py +++ b/tests/test_envs.py @@ -1,4 +1,5 @@ import gym +import numpy as np import pytest import mujoco_maze @@ -6,15 +7,43 @@ import mujoco_maze @pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys()) def test_ant_maze(maze_id): - env = gym.make("Ant{}-v0".format(maze_id)) - assert env.reset().shape == (30,) - s, _, _, _ = env.step(env.action_space.sample()) - assert s.shape == (30,) + for i in range(2): + env = gym.make(f"Ant{maze_id}-v{i}") + assert env.reset().shape == (30,) + s, _, _, _ = env.step(env.action_space.sample()) + assert s.shape == (30,) @pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys()) def test_point_maze(maze_id): - env = gym.make("Point{}-v0".format(maze_id)) - assert env.reset().shape == (7,) - s, _, _, _ = env.step(env.action_space.sample()) - assert s.shape == (7,) + for i in range(2): + env = gym.make(f"Point{maze_id}-v{i}") + assert env.reset().shape == (7,) + s, _, _, _ = env.step(env.action_space.sample()) + assert s.shape == (7,) + + +@pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys()) +def test_collision_lines(maze_id): + env = gym.make(f"Point{maze_id}-v0") + if maze_id == "UMaze": + assert len(env.unwrapped._collision.lines) == 16 + structure = env.unwrapped._maze_structure + scaling = env.unwrapped._maze_size_scaling + init_x = env.unwrapped._init_torso_x + init_y = env.unwrapped._init_torso_y + + def check_pos(pos): + x_orig = (pos.real + init_x) / scaling + y_orig = (pos.imag + init_y) / scaling + return structure[int(round(y_orig))][int(round(x_orig))] + + for line in env.unwrapped._collision.lines: + mid = (line.p1 + line.p2) / 2 + p2p1 = line.p2 - line.p1 + cell1 = check_pos(mid + 0.1 * p2p1 * np.complex(0.0, -1.0)) + cell2 = check_pos(mid + 0.1 * p2p1 * np.complex(0.0, 1.0)) + if cell1.is_block(): + assert not cell2.is_block() + else: + assert cell2.is_block() diff --git a/tests/test_intersect.py b/tests/test_intersect.py new file mode 100644 index 0000000..7f337ec --- /dev/null +++ b/tests/test_intersect.py @@ -0,0 +1,70 @@ +import numpy as np +import pytest + +from mujoco_maze.maze_env_utils import Line + + +@pytest.mark.parametrize( + "l1p1, l1p2, l2p1, l2p2, none", + [ + ((0.0, 0.0), (1.0, 0.0), (0.0, -1.0), (1.0, 1.0), False), + ((1.0, 1.0), (2.0, 3.0), (-1.0, 1.5), (1.5, 1.0), False), + ((1.5, 1.5), (2.0, 3.0), (-1.0, 1.5), (1.5, 1.0), True), + ((0.0, 0.0), (2.0, 0.0), (1.0, 0.0), (1.0, 3.0), False), + ], +) +def test_intersect(l1p1, l1p2, l2p1, l2p2, none): + l1 = Line(l1p1, l1p2) + l2 = Line(l2p1, l2p2) + i1 = l1.intersect(l2) + i2 = line_intersect(l1p1, l1p2, l2p1, l2p2) + if none: + assert i1 is None and i2 is None + else: + assert i1 is not None + np.testing.assert_array_almost_equal(i1, np.array(i2)) + + +def line_intersect(pt1, pt2, ptA, ptB): + """ + Taken from https://www.cs.hmc.edu/ACM/lectures/intersections.html + Returns the intersection of Line(pt1,pt2) and Line(ptA,ptB). + """ + import math + + DET_TOLERANCE = 0.00000001 + + # the first line is pt1 + r*(pt2-pt1) + # in component form: + x1, y1 = pt1 + x2, y2 = pt2 + dx1 = x2 - x1 + dy1 = y2 - y1 + + # the second line is ptA + s*(ptB-ptA) + x, y = ptA + xB, yB = ptB + dx = xB - x + dy = yB - y + + DET = -dx1 * dy + dy1 * dx + + if math.fabs(DET) < DET_TOLERANCE: + return None + + # now, the determinant should be OK + DETinv = 1.0 / DET + + # find the scalar amount along the "self" segment + r = DETinv * (-dy * (x - x1) + dx * (y - y1)) + + # find the scalar amount along the input line + s = DETinv * (-dy1 * (x - x1) + dx1 * (y - y1)) + + # return the average of the two descriptions + xi = (x1 + r * dx1 + x + s * dx) / 2.0 + yi = (y1 + r * dy1 + y + s * dy) / 2.0 + if r >= 0 and 0 <= s <= 1: + return xi, yi + else: + return None