Block Maze

This commit is contained in:
kngwyu 2020-09-26 18:37:20 +09:00
parent 1c4152654b
commit 720f535682
6 changed files with 135 additions and 45 deletions

View File

@ -32,6 +32,10 @@ Thankfully, this project is based on the code from [rllab] and [tensorflow/mode
- PointFall-v0/AntFall-v0 (Distance-based Reward) - PointFall-v0/AntFall-v0 (Distance-based Reward)
- PointFall-v1/AntFall-v1 (Goal-based Reward) - PointFall-v1/AntFall-v1 (Goal-based Reward)
## Caveats
This project has some other features (e.g., block maze and other
robots) but they are work in progress.
## License ## License
This project is licensed under Apache License, Version 2.0 This project is licensed under Apache License, Version 2.0
([LICENSE-APACHE](LICENSE) or http://www.apache.org/licenses/LICENSE-2.0). ([LICENSE-APACHE](LICENSE) or http://www.apache.org/licenses/LICENSE-2.0).

View File

@ -11,6 +11,7 @@ import gym
from mujoco_maze.ant import AntEnv from mujoco_maze.ant import AntEnv
from mujoco_maze.maze_task import TaskRegistry from mujoco_maze.maze_task import TaskRegistry
from mujoco_maze.point import PointEnv from mujoco_maze.point import PointEnv
from mujoco_maze.reacher import ReacherEnv
from mujoco_maze.swimmer import SwimmerEnv from mujoco_maze.swimmer import SwimmerEnv
for maze_id in TaskRegistry.keys(): for maze_id in TaskRegistry.keys():
@ -41,10 +42,28 @@ for maze_id in TaskRegistry.keys():
max_episode_steps=1000, max_episode_steps=1000,
reward_threshold=task_cls.REWARD_THRESHOLD, reward_threshold=task_cls.REWARD_THRESHOLD,
) )
skip_swimmer = False
for inhibited in ["Fall", "Push", "Block"]:
if inhibited in maze_id:
skip_swimmer = True
if "Push" in maze_id or "Fall" in maze_id: if skip_swimmer:
continue continue
# Reacher
gym.envs.register(
id=f"Reacher{maze_id}-v{i}",
entry_point="mujoco_maze.maze_env:MazeEnv",
kwargs=dict(
model_cls=ReacherEnv,
maze_task=task_cls,
maze_size_scaling=task_cls.MAZE_SIZE_SCALING.swimmer,
inner_reward_scaling=task_cls.INNER_REWARD_SCALING,
),
max_episode_steps=1000,
reward_threshold=task_cls.REWARD_THRESHOLD,
)
# Swimmer # Swimmer
gym.envs.register( gym.envs.register(
id=f"Swimmer{maze_id}-v{i}", id=f"Swimmer{maze_id}-v{i}",

View File

@ -13,7 +13,7 @@
<material name='geom' texture="texgeom" texuniform="true" /> <material name='geom' texture="texgeom" texuniform="true" />
</asset> </asset>
<worldbody> <worldbody>
<light cutoff="100" diffuse="1 1 1" dir="-0 0 -1.3" directional="true" exponent="1" pos="0 0 1.3" specular=".1 .1 .1" /> <light cutoff="100" diffuse="1 1 1" dir="-0 0 -1.3" directional="true" exponent="1" pos="0 0s 1.3" specular=".1 .1 .1" />
<geom conaffinity="1" condim="3" material="MatPlane" name="floor" pos="0 0 -0.1" rgba="0.8 0.9 0.8 1" size="40 40 0.1" type="plane" /> <geom conaffinity="1" condim="3" material="MatPlane" name="floor" pos="0 0 -0.1" rgba="0.8 0.9 0.8 1" size="40 40 0.1" type="plane" />
<!-- ================= SWIMMER ================= /--> <!-- ================= SWIMMER ================= /-->
<body name="torso" pos="0 0 0"> <body name="torso" pos="0 0 0">

View File

@ -145,11 +145,8 @@ class MazeEnv(gym.Env):
spinning = struct.can_spin() spinning = struct.can_spin()
shrink = 0.1 if spinning else 0.99 if falling else 1.0 shrink = 0.1 if spinning else 0.99 if falling else 1.0
height_shrink = 0.1 if spinning else 1.0 height_shrink = 0.1 if spinning else 1.0
x = ( x_offset = 0.25 * size_scaling if spinning else 0.0
j * size_scaling - torso_x + 0.25 * size_scaling x = j * size_scaling - torso_x + x_offset
if spinning
else 0.0
)
y = i * size_scaling - torso_y y = i * size_scaling - torso_y
h = height / 2 * size_scaling * height_shrink h = height / 2 * size_scaling * height_shrink
size = 0.5 * size_scaling * shrink size = 0.5 * size_scaling * shrink
@ -462,5 +459,5 @@ class MazeEnv(gym.Env):
info["position"] = self.wrapped_env.get_xy() info["position"] = self.wrapped_env.get_xy()
return next_obs, inner_reward + outer_reward, done, info return next_obs, inner_reward + outer_reward, done, info
def close(self): def close(self) -> None:
self.wrapped_env.close() self.wrapped_env.close()

View File

@ -2,7 +2,7 @@
""" """
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, List, NamedTuple, Tuple, Type from typing import Dict, List, NamedTuple, Optional, Tuple, Type
import numpy as np import numpy as np
@ -51,6 +51,7 @@ class Scaling(NamedTuple):
class MazeTask(ABC): class MazeTask(ABC):
REWARD_THRESHOLD: float REWARD_THRESHOLD: float
PENALTY: Optional[float] = None
MAZE_SIZE_SCALING: Scaling = Scaling(8.0, 4.0, 4.0) MAZE_SIZE_SCALING: Scaling = Scaling(8.0, 4.0, 4.0)
INNER_REWARD_SCALING: float = 0.01 INNER_REWARD_SCALING: float = 0.01
TOP_DOWN_VIEW: bool = False TOP_DOWN_VIEW: bool = False
@ -89,41 +90,16 @@ class DistRewardMixIn:
return -self.goals[0].euc_dist(obs) / self.scale return -self.goals[0].euc_dist(obs) / self.scale
class GoalRewardSimpleRoom(MazeTask):
""" Very easy task. For testing.
"""
REWARD_THRESHOLD: float = 0.9
def __init__(self, scale: float) -> None:
super().__init__(scale)
self.goals = [MazeGoal(np.array([2.0 * scale, 0.0]))]
def reward(self, obs: np.ndarray) -> float:
return 1.0 if self.termination(obs) else -0.0001
@staticmethod
def create_maze() -> List[List[MazeCell]]:
E, B, R = MazeCell.EMPTY, MazeCell.BLOCK, MazeCell.ROBOT
return [
[B, B, B, B, B],
[B, R, E, E, B],
[B, B, B, B, B],
]
class DistRewardSimpleRoom(GoalRewardSimpleRoom, DistRewardMixIn):
pass
class GoalRewardUMaze(MazeTask): class GoalRewardUMaze(MazeTask):
REWARD_THRESHOLD: float = 0.9 REWARD_THRESHOLD: float = 0.9
PENALTY: float = -0.0001
def __init__(self, scale: float) -> None: def __init__(self, scale: float) -> None:
super().__init__(scale) super().__init__(scale)
self.goals = [MazeGoal(np.array([0.0, 2.0 * scale]))] self.goals = [MazeGoal(np.array([0.0, 2.0 * scale]))]
def reward(self, obs: np.ndarray) -> float: def reward(self, obs: np.ndarray) -> float:
return 1.0 if self.termination(obs) else -0.0001 return 1.0 if self.termination(obs) else self.PENALTY
@staticmethod @staticmethod
def create_maze() -> List[List[MazeCell]]: def create_maze() -> List[List[MazeCell]]:
@ -141,6 +117,25 @@ class DistRewardUMaze(GoalRewardUMaze, DistRewardMixIn):
pass pass
class GoalRewardSimpleRoom(GoalRewardUMaze):
def __init__(self, scale: float) -> None:
super().__init__(scale)
self.goals = [MazeGoal(np.array([2.0 * scale, 0.0]))]
@staticmethod
def create_maze() -> List[List[MazeCell]]:
E, B, R = MazeCell.EMPTY, MazeCell.BLOCK, MazeCell.ROBOT
return [
[B, B, B, B, B],
[B, R, E, E, B],
[B, B, B, B, B],
]
class DistRewardSimpleRoom(GoalRewardSimpleRoom, DistRewardMixIn):
pass
class GoalRewardPush(GoalRewardUMaze): class GoalRewardPush(GoalRewardUMaze):
TOP_DOWN_VIEW = True TOP_DOWN_VIEW = True
@ -188,8 +183,29 @@ class DistRewardFall(GoalRewardFall, DistRewardMixIn):
pass pass
class GoalRewardFall(GoalRewardUMaze):
TOP_DOWN_VIEW = True
def __init__(self, scale: float) -> None:
super().__init__(scale)
self.goals = [MazeGoal(np.array([0.0, 3.375 * scale, 4.5]))]
@staticmethod
def create_maze() -> List[List[MazeCell]]:
E, B, C, R = MazeCell.EMPTY, MazeCell.BLOCK, MazeCell.CHASM, MazeCell.ROBOT
return [
[B, B, B, B],
[B, R, E, B],
[B, E, MazeCell.YZ, B],
[B, C, C, B],
[B, E, E, B],
[B, B, B, B],
]
class GoalReward2Rooms(MazeTask): class GoalReward2Rooms(MazeTask):
REWARD_THRESHOLD: float = 0.9 REWARD_THRESHOLD: float = 0.9
PENALTY: float = -0.0001
MAZE_SIZE_SCALING: Scaling = Scaling(4.0, 4.0, 4.0) MAZE_SIZE_SCALING: Scaling = Scaling(4.0, 4.0, 4.0)
def __init__(self, scale: float) -> None: def __init__(self, scale: float) -> None:
@ -200,7 +216,7 @@ class GoalReward2Rooms(MazeTask):
for goal in self.goals: for goal in self.goals:
if goal.neighbor(obs): if goal.neighbor(obs):
return goal.reward_scale return goal.reward_scale
return -0.0001 return self.PENALTY
@staticmethod @staticmethod
def create_maze() -> List[List[MazeCell]]: def create_maze() -> List[List[MazeCell]]:
@ -228,6 +244,7 @@ class SubGoal2Rooms(GoalReward2Rooms):
class GoalReward4Rooms(MazeTask): class GoalReward4Rooms(MazeTask):
REWARD_THRESHOLD: float = 0.9 REWARD_THRESHOLD: float = 0.9
PENALTY: float = -0.0001
MAZE_SIZE_SCALING: Scaling = Scaling(4.0, 4.0, 4.0) MAZE_SIZE_SCALING: Scaling = Scaling(4.0, 4.0, 4.0)
def __init__(self, scale: float) -> None: def __init__(self, scale: float) -> None:
@ -238,7 +255,7 @@ class GoalReward4Rooms(MazeTask):
for goal in self.goals: for goal in self.goals:
if goal.neighbor(obs): if goal.neighbor(obs):
return goal.reward_scale return goal.reward_scale
return -0.0001 return self.PENALTY
@staticmethod @staticmethod
def create_maze() -> List[List[MazeCell]]: def create_maze() -> List[List[MazeCell]]:
@ -271,6 +288,7 @@ class SubGoal4Rooms(GoalReward4Rooms):
class GoalRewardTRoom(MazeTask): class GoalRewardTRoom(MazeTask):
REWARD_THRESHOLD: float = 0.9 REWARD_THRESHOLD: float = 0.9
PENALTY: float = -0.0001
MAZE_SIZE_SCALING: Scaling = Scaling(4.0, 4.0, 4.0) MAZE_SIZE_SCALING: Scaling = Scaling(4.0, 4.0, 4.0)
def __init__( def __init__(
@ -285,7 +303,7 @@ class GoalRewardTRoom(MazeTask):
for goal in self.goals: for goal in self.goals:
if goal.neighbor(obs): if goal.neighbor(obs):
return goal.reward_scale return goal.reward_scale
return -0.0001 return self.PENALTY
@staticmethod @staticmethod
def create_maze() -> List[List[MazeCell]]: def create_maze() -> List[List[MazeCell]]:
@ -304,6 +322,30 @@ class DistRewardTRoom(GoalRewardTRoom, DistRewardMixIn):
pass pass
class GoalRewardBlockMaze(GoalRewardUMaze):
OBSERVE_BLOCKS: bool = True
def __init__(self, scale: float) -> None:
super().__init__(scale)
self.goals = [MazeGoal(np.array([0.0, 3.0 * scale]))]
@staticmethod
def create_maze() -> List[List[MazeCell]]:
E, B, R, M = MazeCell.EMPTY, MazeCell.BLOCK, MazeCell.ROBOT, MazeCell.XY
return [
[B, B, B, B, B],
[B, R, E, E, B],
[B, B, B, M, B],
[B, E, E, E, B],
[B, E, E, E, B],
[B, B, B, B, B],
]
class DistRewardBlockMaze(GoalRewardBlockMaze, DistRewardMixIn):
pass
class TaskRegistry: class TaskRegistry:
REGISTRY: Dict[str, List[Type[MazeTask]]] = { REGISTRY: Dict[str, List[Type[MazeTask]]] = {
"SimpleRoom": [DistRewardSimpleRoom, GoalRewardSimpleRoom], "SimpleRoom": [DistRewardSimpleRoom, GoalRewardSimpleRoom],
@ -313,6 +355,7 @@ class TaskRegistry:
"2Rooms": [DistReward2Rooms, GoalReward2Rooms, SubGoal2Rooms], "2Rooms": [DistReward2Rooms, GoalReward2Rooms, SubGoal2Rooms],
"4Rooms": [DistReward4Rooms, GoalReward4Rooms, SubGoal4Rooms], "4Rooms": [DistReward4Rooms, GoalReward4Rooms, SubGoal4Rooms],
"TRoom": [DistRewardTRoom, GoalRewardTRoom], "TRoom": [DistRewardTRoom, GoalRewardTRoom],
"BlockMaze": [DistRewardBlockMaze, GoalRewardBlockMaze],
} }
@staticmethod @staticmethod

View File

@ -10,7 +10,7 @@ def test_ant_maze(maze_id):
env = gym.make(f"Ant{maze_id}-v{i}") env = gym.make(f"Ant{maze_id}-v{i}")
s0 = env.reset() s0 = env.reset()
s, _, _, _ = env.step(env.action_space.sample()) s, _, _, _ = env.step(env.action_space.sample())
if not env.unwrapped._top_down_view: if not env.unwrapped._top_down_view and not env.unwrapped._observe_blocks:
assert s0.shape == (30,) assert s0.shape == (30,)
assert s.shape == (30,) assert s.shape == (30,)
@ -20,21 +20,41 @@ def test_point_maze(maze_id):
for i in range(2): for i in range(2):
env = gym.make(f"Point{maze_id}-v{i}") env = gym.make(f"Point{maze_id}-v{i}")
s0 = env.reset() s0 = env.reset()
s, _, _, _ = env.step(env.action_space.sample()) s, r, _, _ = env.step(env.action_space.sample())
if not env.unwrapped._top_down_view: if not env.unwrapped._top_down_view and not env.unwrapped._observe_blocks:
assert s0.shape == (7,) assert s0.shape == (7,)
assert s.shape == (7,) assert s.shape == (7,)
if i == 0:
assert r != 0.0
else:
assert r == env.unwrapped._task.PENALTY
assert r < 0.0
@pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys())
def test_reacher_maze(maze_id):
for inhibited in ["Fall", "Push", "Block"]:
if inhibited in maze_id:
return
for i in range(2):
env = gym.make(f"Reacher{maze_id}-v{i}")
s0 = env.reset()
s, _, _, _ = env.step(env.action_space.sample())
if not env.unwrapped._top_down_view and not env.unwrapped._observe_blocks:
assert s0.shape == (9,)
assert s.shape == (9,)
@pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys()) @pytest.mark.parametrize("maze_id", mujoco_maze.TaskRegistry.keys())
def test_swimmer_maze(maze_id): def test_swimmer_maze(maze_id):
if "Fall" in maze_id or "Push" in maze_id: for inhibited in ["Fall", "Push", "Block"]:
if inhibited in maze_id:
return return
for i in range(2): for i in range(2):
env = gym.make(f"Swimmer{maze_id}-v{i}") env = gym.make(f"Swimmer{maze_id}-v{i}")
s0 = env.reset() s0 = env.reset()
s, _, _, _ = env.step(env.action_space.sample()) s, _, _, _ = env.step(env.action_space.sample())
if not env.unwrapped._top_down_view: if not env.unwrapped._top_down_view and not env.unwrapped._observe_blocks:
assert s0.shape == (11,) assert s0.shape == (11,)
assert s.shape == (11,) assert s.shape == (11,)
@ -45,3 +65,10 @@ def test_maze_args(v):
assert env.reset().shape == (7,) assert env.reset().shape == (7,)
s, _, _, _ = env.step(env.action_space.sample()) s, _, _, _ = env.step(env.action_space.sample())
assert s.shape == (7,) assert s.shape == (7,)
def test_getting_movable(v):
env = gym.make("PointBlockMaze-v1")
assert env.reset().shape == (7,)
s, _, _, _ = env.step(env.action_space.sample())
assert s.shape == (7,)