Prevent the point from going through the wall

This commit is contained in:
kngwyu 2020-05-30 18:24:11 +09:00
parent 741746a400
commit 348e975e60
8 changed files with 113 additions and 94 deletions

View File

@ -3,8 +3,8 @@
Some maze environments for reinforcement learning(RL) using [mujoco-py] and
[openai gym][gym].
Thankfully, this project is based on the code from [tensorflow/models][models], [rllab]
and [deep-skill-chaining][dsc].
Thankfully, this project is based on the code from [rllab],
[tensorflow/models][models], and [deep-skill-chaining][dsc].
## License
This project is licensed under Apache License, Version 2.0

View File

@ -5,6 +5,7 @@ MAZE_IDS = ["Maze", "Push", "Fall"] # TODO: Block, BlockMaze
def _get_kwargs(maze_id: str) -> tuple:
return {
"maze_id": maze_id,
"observe_blocks": maze_id in ["Block", "BlockMaze"],
"put_spin_near_agent": maze_id in ["Block", "BlockMaze"],
}
@ -14,7 +15,7 @@ for maze_id in MAZE_IDS:
gym.envs.register(
id="Ant{}-v0".format(maze_id),
entry_point="mujoco_maze.ant_maze_env:AntMazeEnv",
kwargs=dict(maze_id=maze_id, maze_size_scaling=8, **_get_kwargs(maze_id)),
kwargs=dict(maze_size_scaling=8.0, **_get_kwargs(maze_id)),
max_episode_steps=1000,
reward_threshold=-1000,
)
@ -23,12 +24,7 @@ for maze_id in MAZE_IDS:
gym.envs.register(
id="Point{}-v0".format(maze_id),
entry_point="mujoco_maze.point_maze_env:PointMazeEnv",
kwargs=dict(
maze_id=maze_id,
maze_size_scaling=4,
manual_collision=True,
**_get_kwargs(maze_id),
),
kwargs=_get_kwargs(maze_id),
max_episode_steps=1000,
reward_threshold=-1000,
)

View File

@ -16,8 +16,8 @@
<light directional="true" cutoff="100" exponent="1" diffuse="1 1 1" specular=".1 .1 .1" pos="0 0 1.3" dir="-0 0 -1.3" />
<geom name='floor' material="MatPlane" pos='0 0 0' size='40 40 40' type='plane' conaffinity='1' rgba='0.8 0.9 0.8 1' condim='3' />
<body name="torso" pos="0 0 0">
<geom name="pointbody" type="sphere" size="0.5" pos="0 0 0.5" />
<geom name="pointarrow" type="box" size="0.5 0.1 0.1" pos="0.6 0 0.5" />
<geom name="pointbody" type="sphere" size="0.5" pos="0 0 0.5" solimp="0.98 0.99 0.001" />
<geom name="pointarrow" type="box" size="0.5 0.1 0.1" pos="0.6 0 0.5" solimp="0.98 0.99 0.001" />
<joint name='ballx' type='slide' axis='1 0 0' pos='0 0 0' />
<joint name='bally' type='slide' axis='0 1 0' pos='0 0 0' />
<joint name='rot' type='hinge' axis='0 0 1' pos='0 0 0' limited="false" />

View File

@ -15,12 +15,13 @@
"""Adapted from rllab maze_env.py."""
import os
import tempfile
import xml.etree.ElementTree as ET
import itertools as it
import math
import numpy as np
import gym
import os
import tempfile
import xml.etree.ElementTree as ET
from typing import Callable, Type, Union
@ -34,22 +35,20 @@ MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) + "/assets"
class MazeEnv(gym.Env):
MODEL_CLASS: Type[AgentModel] = AgentModel
MAZE_HEIGHT = None
MAZE_SIZE_SCALING = None
MANUAL_COLLISION: bool = False
def __init__(
self,
maze_id=None,
maze_height=0.5,
maze_size_scaling=8,
n_bins=0,
sensor_range=3.0,
sensor_span=2 * math.pi,
observe_blocks=False,
put_spin_near_agent=False,
top_down_view=False,
manual_collision=False,
dense_reward=True,
maze_height: float = 0.5,
maze_size_scaling: float = 4.0,
goal_sampler: Union[str, np.ndarray, Callable[[], np.ndarray]] = "default",
*args,
**kwargs,
@ -60,8 +59,8 @@ class MazeEnv(gym.Env):
tree = ET.parse(xml_path)
worldbody = tree.find(".//worldbody")
self.MAZE_HEIGHT = height = maze_height
self.MAZE_SIZE_SCALING = size_scaling = maze_size_scaling
self._maze_height = height = maze_height
self._maze_size_scaling = size_scaling = maze_size_scaling
self.t = 0 # time steps
self._n_bins = n_bins
self._sensor_range = sensor_range * size_scaling
@ -69,17 +68,16 @@ class MazeEnv(gym.Env):
self._observe_blocks = observe_blocks
self._put_spin_near_agent = put_spin_near_agent
self._top_down_view = top_down_view
self._manual_collision = manual_collision
self.MAZE_STRUCTURE = structure = maze_env_utils.construct_maze(
self._maze_structure = structure = maze_env_utils.construct_maze(
maze_id=self._maze_id
)
self.elevated = any(
-1 in row for row in structure
) # Elevate the maze to allow for falling.
# Elevate the maze to allow for falling.
self.elevated = any(-1 in row for row in structure)
# Are there any movable blocks?
self.blocks = any(
any(maze_env_utils.can_move(r) for r in row) for row in structure
) # Are there any movable blocks?
)
torso_x, torso_y = self._find_robot()
self._init_torso_x = torso_x
@ -88,13 +86,16 @@ class MazeEnv(gym.Env):
(x - torso_x, y - torso_y) for x, y in self._find_all_robots()
]
self._collision = maze_env_utils.Collision(
structure, size_scaling, torso_x, torso_y,
)
self._xy_to_rowcol = lambda x, y: (
2 + (y + size_scaling / 2) / size_scaling,
2 + (x + size_scaling / 2) / size_scaling,
)
self._view = np.zeros(
[5, 5, 3]
) # walls (immovable), chasms (fall), movable blocks
# walls (immovable), chasms (fall), movable blocks
self._view = np.zeros([5, 5, 3])
height_offset = 0.0
if self.elevated:
@ -275,7 +276,7 @@ class MazeEnv(gym.Env):
if goal_sampler == "random":
self._goal_sampler = lambda: np.random.uniform((-4, -4), (20, 20))
elif goal_sampler == "default":
default_goal = _default_goal(maze_id)
default_goal = _default_goal(maze_id, size_scaling)
self._goal_sampler = lambda: default_goal
else:
raise NotImplementedError(f"Unknown goal_sampler: {goal_sampler}")
@ -357,8 +358,8 @@ class MazeEnv(gym.Env):
self._robot_y = robot_y
self._robot_ori = self.get_ori()
structure = self.MAZE_STRUCTURE
size_scaling = self.MAZE_SIZE_SCALING
structure = self._maze_structure
size_scaling = self._maze_size_scaling
# Draw immovable blocks and chasms.
for i in range(len(structure)):
@ -388,9 +389,9 @@ class MazeEnv(gym.Env):
robot_x, robot_y, robot_z = self.wrapped_env.get_body_com("torso")[:3]
ori = self.get_ori()
structure = self.MAZE_STRUCTURE
size_scaling = self.MAZE_SIZE_SCALING
height = self.MAZE_HEIGHT
structure = self._maze_structure
size_scaling = self._maze_size_scaling
height = self._maze_height
segments = []
# Get line segments (corresponding to outer boundary) of each immovable
@ -523,49 +524,28 @@ class MazeEnv(gym.Env):
return self.wrapped_env.action_space
def _find_robot(self):
structure = self.MAZE_STRUCTURE
size_scaling = self.MAZE_SIZE_SCALING
for i in range(len(structure)):
for j in range(len(structure[0])):
if structure[i][j] == "r":
return j * size_scaling, i * size_scaling
assert False, "No robot in maze specification."
structure = self._maze_structure
size_scaling = self._maze_size_scaling
for i, j in it.product(range(len(structure)), range(len(structure[0]))):
if structure[i][j] == "r":
return j * size_scaling, i * size_scaling
raise ValueError("No robot in maze specification.")
def _find_all_robots(self):
structure = self.MAZE_STRUCTURE
size_scaling = self.MAZE_SIZE_SCALING
structure = self._maze_structure
size_scaling = self._maze_size_scaling
coords = []
for i in range(len(structure)):
for j in range(len(structure[0])):
if structure[i][j] == "r":
coords.append((j * size_scaling, i * size_scaling))
for i, j in it.product(range(len(structure)), range(len(structure[0]))):
if structure[i][j] == "r":
coords.append((j * size_scaling, i * size_scaling))
return coords
def _is_in_collision(self, pos):
x, y = pos
structure = self.MAZE_STRUCTURE
size_scaling = self.MAZE_SIZE_SCALING
for i in range(len(structure)):
for j in range(len(structure[0])):
if structure[i][j] == 1:
minx = j * size_scaling - size_scaling * 0.5 - self._init_torso_x
maxx = j * size_scaling + size_scaling * 0.5 - self._init_torso_x
miny = i * size_scaling - size_scaling * 0.5 - self._init_torso_y
maxy = i * size_scaling + size_scaling * 0.5 - self._init_torso_y
if minx <= x <= maxx and miny <= y <= maxy:
return True
return False
def _is_in_goal(self, pos):
(np.linalg.norm(obs[:3] - goal) <= 0.6)
def step(self, action):
self.t += 1
if self._manual_collision:
if self.MANUAL_COLLISION:
old_pos = self.wrapped_env.get_xy()
inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action)
new_pos = self.wrapped_env.get_xy()
if self._is_in_collision(new_pos):
if self._collision.is_in(self.wrapped_env.get_xy()):
self.wrapped_env.set_xy(old_pos)
else:
inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action)
@ -601,12 +581,12 @@ def _reward_fn(maze_id: str, dense: str) -> callable:
raise NotImplementedError(f"Unknown maze id: {maze_id}")
def _default_goal(maze_id: str) -> np.ndarray:
def _default_goal(maze_id: str, scale: float) -> np.ndarray:
if maze_id == "Maze" or maze_id == "BlockMaze":
return np.array([0.0, 16.0])
return np.array([0.0, 2.0 * scale])
elif maze_id == "Push":
return np.array([0.0, 19.0])
return np.array([0.0, 2.375 * scale])
elif maze_id == "Fall":
return np.array([0.0, 27.0, 4.5])
return np.array([0.0, 3.375 * scale, 4.5])
else:
raise NotImplementedError(f"Unknown maze id: {maze_id}")

View File

@ -14,10 +14,12 @@
# ==============================================================================
"""Adapted from rllab maze_env_utils.py."""
import itertools as it
import math
import numpy as np
class Move(object):
class Move:
X = 11
Y = 12
Z = 13
@ -49,10 +51,11 @@ def can_move(movable):
def construct_maze(maze_id="Maze"):
R = "r"
if maze_id == "Maze":
structure = [
[1, 1, 1, 1, 1],
[1, "r", 0, 0, 1],
[1, R, 0, 0, 1],
[1, 1, 1, 0, 1],
[1, 0, 0, 0, 1],
[1, 1, 1, 1, 1],
@ -60,7 +63,7 @@ def construct_maze(maze_id="Maze"):
elif maze_id == "Push":
structure = [
[1, 1, 1, 1, 1],
[1, 0, "r", 1, 1],
[1, 0, R, 1, 1],
[1, 0, Move.XY, 0, 1],
[1, 1, 0, 1, 1],
[1, 1, 1, 1, 1],
@ -68,26 +71,24 @@ def construct_maze(maze_id="Maze"):
elif maze_id == "Fall":
structure = [
[1, 1, 1, 1],
[1, "r", 0, 1],
[1, R, 0, 1],
[1, 0, Move.YZ, 1],
[1, -1, -1, 1],
[1, 0, 0, 1],
[1, 1, 1, 1],
]
elif maze_id == "Block":
O = "r"
structure = [
[1, 1, 1, 1, 1],
[1, O, 0, 0, 1],
[1, R, 0, 0, 1],
[1, 0, 0, 0, 1],
[1, 0, 0, 0, 1],
[1, 1, 1, 1, 1],
]
elif maze_id == "BlockMaze":
O = "r"
structure = [
[1, 1, 1, 1],
[1, O, 0, 1],
[1, R, 0, 1],
[1, 1, 0, 1],
[1, 0, 0, 1],
[1, 1, 1, 1],
@ -98,12 +99,54 @@ def construct_maze(maze_id="Maze"):
return structure
class Collision:
"""For manual collision detection.
"""
ARROUND = np.array([[-1, 0], [1, 0], [0, -1], [0, 1]])
OFFSET = {False: 0.45, True: 0.5}
def __init__(
self, structure: list, size_scaling: float, torso_x: float, torso_y: float,
) -> None:
h, w = len(structure), len(structure[0])
self.objects = []
def is_block(pos) -> bool:
i, j = pos
if 0 <= i < h and 0 <= j < w:
return structure[i][j] == 1
else:
return False
def offset(pos, index) -> float:
return self.OFFSET[is_block(pos + self.ARROUND[index])]
for i, j in it.product(range(len(structure)), range(len(structure[0]))):
if structure[i][j] != 1:
continue
pos = np.array([i, j])
y_base = i * size_scaling - torso_y
min_y = y_base - size_scaling * offset(pos, 0)
max_y = y_base + size_scaling * offset(pos, 1)
x_base = j * size_scaling - torso_x
min_x = x_base - size_scaling * offset(pos, 2)
max_x = x_base + size_scaling * offset(pos, 3)
self.objects.append((min_y, max_y, min_x, max_x))
def is_in(self, pos) -> bool:
x, y = pos
for min_y, max_y, min_x, max_x in self.objects:
if min_x <= x <= max_x and min_y <= y <= max_y:
return True
return False
def line_intersect(pt1, pt2, ptA, ptB):
"""
Taken from https://www.cs.hmc.edu/ACM/lectures/intersections.html
this returns the intersection of Line(pt1,pt2) and Line(ptA,ptB)
"""
Taken from https://www.cs.hmc.edu/ACM/lectures/intersections.html
Returns the intersection of Line(pt1,pt2) and Line(ptA,ptB).
"""
DET_TOLERANCE = 0.00000001
@ -142,14 +185,13 @@ def line_intersect(pt1, pt2, ptA, ptB):
def ray_segment_intersect(ray, segment):
"""
Check if the ray originated from (x, y) with direction theta intersects the line segment (x1, y1) -- (x2, y2),
and return the intersection point if there is one
"""
Check if the ray originated from (x, y) with direction theta intersect the line
segment (x1, y1) -- (x2, y2), and return the intersection point if there is one.
"""
(x, y), theta = ray
# (x1, y1), (x2, y2) = segment
pt1 = (x, y)
len = 1
pt2 = (x + len * math.cos(theta), y + len * math.sin(theta))
pt2 = (x + math.cos(theta), y + math.sin(theta))
xo, yo, valid, r, s = line_intersect(pt1, pt2, *segment)
if valid and r >= 0 and 0 <= s <= 1:
return (xo, yo)

View File

@ -79,7 +79,7 @@ class PointEnv(AgentModel):
def get_xy(self):
qpos = self.sim.data.qpos
return qpos[0], qpos[0]
return qpos[0], qpos[1]
def set_xy(self, xy):
qpos = np.copy(self.sim.data.qpos)

View File

@ -19,3 +19,4 @@ from mujoco_maze.point import PointEnv
class PointMazeEnv(MazeEnv):
MODEL_CLASS = PointEnv
MANUAL_COLLISION = True

View File

@ -5,11 +5,11 @@ import pytest
@pytest.mark.parametrize("maze_id", mujoco_maze.MAZE_IDS)
def test_ant_maze(maze_id):
env = gym.make("AntMaze{}-v0".format(maze_id))
env = gym.make("Ant{}-v0".format(maze_id))
assert env.reset().shape == (30,)
@pytest.mark.parametrize("maze_id", mujoco_maze.MAZE_IDS)
def test_point_maze(maze_id):
env = gym.make("PointMaze{}-v0".format(maze_id))
env = gym.make("Point{}-v0".format(maze_id))
assert env.reset().shape == (7,)