Implement 4Rooms
This commit is contained in:
parent
d08cfe5d0e
commit
c9ebb1e2c7
@ -3,7 +3,7 @@ import gym
|
|||||||
from mujoco_maze.maze_task import TaskRegistry
|
from mujoco_maze.maze_task import TaskRegistry
|
||||||
|
|
||||||
|
|
||||||
MAZE_IDS = ["Maze", "Push", "Fall"] # TODO: Block, BlockMaze
|
MAZE_IDS = ["Maze", "Push", "Fall", "4Rooms"] # TODO: Block, BlockMaze
|
||||||
|
|
||||||
|
|
||||||
def _get_kwargs(maze_id: str) -> tuple:
|
def _get_kwargs(maze_id: str) -> tuple:
|
||||||
|
@ -40,7 +40,7 @@ class MazeEnv(gym.Env):
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
maze_task: Type[maze_task.MazeTask] = maze_task.SingleGoalSparseEMaze(),
|
maze_task: Type[maze_task.MazeTask] = maze_task.SingleGoalSparseUMaze,
|
||||||
n_bins: int = 0,
|
n_bins: int = 0,
|
||||||
sensor_range: float = 3.0,
|
sensor_range: float = 3.0,
|
||||||
sensor_span: float = 2 * np.pi,
|
sensor_span: float = 2 * np.pi,
|
||||||
@ -52,7 +52,7 @@ class MazeEnv(gym.Env):
|
|||||||
*args,
|
*args,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._task = maze_task()
|
self._task = maze_task(maze_size_scaling)
|
||||||
|
|
||||||
xml_path = os.path.join(MODEL_DIR, self.MODEL_CLASS.FILE)
|
xml_path = os.path.join(MODEL_DIR, self.MODEL_CLASS.FILE)
|
||||||
tree = ET.parse(xml_path)
|
tree = ET.parse(xml_path)
|
||||||
@ -246,8 +246,23 @@ class MazeEnv(gym.Env):
|
|||||||
if "name" not in geom.attrib:
|
if "name" not in geom.attrib:
|
||||||
raise Exception("Every geom of the torso must have a name " "defined")
|
raise Exception("Every geom of the torso must have a name " "defined")
|
||||||
|
|
||||||
|
# Set goals
|
||||||
|
asset = tree.find(".//asset")
|
||||||
|
for i, goal in enumerate(self._task.goals):
|
||||||
|
ET.SubElement(asset, "material", name=f"goal{i}", rgba=goal.rbga_str())
|
||||||
|
z = goal.pos[2] if goal.dim >= 3 else 0.0
|
||||||
|
ET.SubElement(
|
||||||
|
worldbody,
|
||||||
|
"site",
|
||||||
|
name=f"goal_site{i}",
|
||||||
|
pos=f"{goal.pos[0]} {goal.pos[1]} {z}",
|
||||||
|
size=f"{maze_size_scaling * 0.1}",
|
||||||
|
material=f"goal{i}",
|
||||||
|
)
|
||||||
|
|
||||||
_, file_path = tempfile.mkstemp(text=True, suffix=".xml")
|
_, file_path = tempfile.mkstemp(text=True, suffix=".xml")
|
||||||
tree.write(file_path)
|
tree.write(file_path)
|
||||||
|
self.world_tree = tree
|
||||||
self.wrapped_env = self.MODEL_CLASS(*args, file_path=file_path, **kwargs)
|
self.wrapped_env = self.MODEL_CLASS(*args, file_path=file_path, **kwargs)
|
||||||
|
|
||||||
def get_ori(self):
|
def get_ori(self):
|
||||||
@ -458,12 +473,18 @@ class MazeEnv(gym.Env):
|
|||||||
self.t = 0
|
self.t = 0
|
||||||
self.wrapped_env.reset()
|
self.wrapped_env.reset()
|
||||||
# Sample a new goal
|
# Sample a new goal
|
||||||
self._task.sample_goals(self._maze_size_scaling)
|
if self._task.sample_goals():
|
||||||
|
self.set_marker()
|
||||||
if len(self._init_positions) > 1:
|
if len(self._init_positions) > 1:
|
||||||
xy = np.random.choice(self._init_positions)
|
xy = np.random.choice(self._init_positions)
|
||||||
self.wrapped_env.set_xy(xy)
|
self.wrapped_env.set_xy(xy)
|
||||||
return self._get_obs()
|
return self._get_obs()
|
||||||
|
|
||||||
|
def set_marker(self):
|
||||||
|
for i, goal in enumerate(self._task.goals):
|
||||||
|
idx = self.model.site_name2id(f"goal{i}")
|
||||||
|
self.data.site_xpos[idx][: len(goal.pos)] = goal.pos
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def viewer(self):
|
def viewer(self):
|
||||||
return self.wrapped_env.viewer
|
return self.wrapped_env.viewer
|
||||||
|
@ -1,65 +1,74 @@
|
|||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import Dict, List, Type
|
from typing import Dict, List, Tuple, Type
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from mujoco_maze.maze_env_utils import MazeCell
|
from mujoco_maze.maze_env_utils import MazeCell
|
||||||
|
|
||||||
|
Rgb = Tuple[float, float, float]
|
||||||
|
|
||||||
|
RED = (0.7, 0.1, 0.1)
|
||||||
|
GREEN = (0.1, 0.7, 0.1)
|
||||||
|
|
||||||
|
|
||||||
class MazeGoal:
|
class MazeGoal:
|
||||||
THRESHOLD: float = 0.6
|
THRESHOLD: float = 0.6
|
||||||
|
|
||||||
def __init__(self, goal: np.ndarray, reward_scale: float = 1.0) -> None:
|
def __init__(
|
||||||
self.goal = goal
|
self, pos: np.ndarray, reward_scale: float = 1.0, rgb: Rgb = RED
|
||||||
self.goal_dim = goal.shape[0]
|
) -> None:
|
||||||
|
assert 0.0 <= reward_scale <= 1.0
|
||||||
|
self.pos = pos
|
||||||
|
self.dim = pos.shape[0]
|
||||||
self.reward_scale = reward_scale
|
self.reward_scale = reward_scale
|
||||||
|
self.rgb = rgb
|
||||||
|
|
||||||
|
def rbga_str(self) -> str:
|
||||||
|
r, g, b = self.rgb
|
||||||
|
return f"{r} {g} {b} 1"
|
||||||
|
|
||||||
def neighbor(self, obs: np.ndarray) -> float:
|
def neighbor(self, obs: np.ndarray) -> float:
|
||||||
return np.linalg.norm(obs[: self.goal_dim] - self.goal) <= self.THRESHOLD
|
return np.linalg.norm(obs[: self.dim] - self.pos) <= self.THRESHOLD
|
||||||
|
|
||||||
def euc_dist(self, obs: np.ndarray) -> float:
|
def euc_dist(self, obs: np.ndarray) -> float:
|
||||||
return np.sum(np.square(obs[: self.goal_dim] - self.goal)) ** 0.5
|
return np.sum(np.square(obs[: self.dim] - self.pos)) ** 0.5
|
||||||
|
|
||||||
|
|
||||||
class MazeTask(ABC):
|
class MazeTask(ABC):
|
||||||
REWARD_THRESHOLD: float
|
REWARD_THRESHOLD: float
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self, scale: float) -> None:
|
||||||
|
self.scale = scale
|
||||||
self.goals = []
|
self.goals = []
|
||||||
|
|
||||||
@abstractmethod
|
def sample_goals(self) -> bool:
|
||||||
def sample_goals(self, scale: float) -> None:
|
return False
|
||||||
pass
|
|
||||||
|
def termination(self, obs: np.ndarray) -> bool:
|
||||||
|
for goal in self.goals:
|
||||||
|
if goal.neighbor(obs):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def reward(self, obs: np.ndarray) -> float:
|
def reward(self, obs: np.ndarray) -> float:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def termination(self, obs: np.ndarray) -> bool:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def create_maze() -> List[List[MazeCell]]:
|
def create_maze() -> List[List[MazeCell]]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class SingleGoalSparseEMaze(MazeTask):
|
class SingleGoalSparseUMaze(MazeTask):
|
||||||
REWARD_THRESHOLD: float = 0.9
|
REWARD_THRESHOLD: float = 0.9
|
||||||
|
|
||||||
def sample_goals(self, scale: float) -> None:
|
def __init__(self, scale: float) -> None:
|
||||||
goal = MazeGoal(np.array([0.0, 2.0 * scale]))
|
super().__init__(scale)
|
||||||
self.goals = [goal]
|
self.goals = [MazeGoal(np.array([0.0, 2.0 * scale]))]
|
||||||
|
|
||||||
def reward(self, obs: np.ndarray) -> float:
|
def reward(self, obs: np.ndarray) -> float:
|
||||||
if self.goals[0].neighbor(obs):
|
return 1.0 if self.termination(obs) else -0.0001
|
||||||
return 1.0
|
|
||||||
else:
|
|
||||||
return -0.0001
|
|
||||||
|
|
||||||
def termination(self, obs: np.ndarray) -> bool:
|
|
||||||
return self.goals[0].neighbor(obs)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_maze() -> List[List[MazeCell]]:
|
def create_maze() -> List[List[MazeCell]]:
|
||||||
@ -73,17 +82,17 @@ class SingleGoalSparseEMaze(MazeTask):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class SingleGoalDenseEMaze(SingleGoalSparseEMaze):
|
class SingleGoalDenseUMaze(SingleGoalSparseUMaze):
|
||||||
REWARD_THRESHOLD: float = 1000.0
|
REWARD_THRESHOLD: float = 1000.0
|
||||||
|
|
||||||
def reward(self, obs: np.ndarray) -> float:
|
def reward(self, obs: np.ndarray) -> float:
|
||||||
return -self.goals[0].euc_dist(obs)
|
return -self.goals[0].euc_dist(obs)
|
||||||
|
|
||||||
|
|
||||||
class SingleGoalSparsePush(SingleGoalSparseEMaze):
|
class SingleGoalSparsePush(SingleGoalSparseUMaze):
|
||||||
def sample_goals(self, scale: float) -> None:
|
def __init__(self, scale: float) -> None:
|
||||||
goal = MazeGoal(np.array([0.0, 2.375 * scale]))
|
super().__init__(scale)
|
||||||
self.goals = [goal]
|
self.goals = [MazeGoal(np.array([0.0, 2.375 * scale]))]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_maze() -> List[List[MazeCell]]:
|
def create_maze() -> List[List[MazeCell]]:
|
||||||
@ -104,10 +113,10 @@ class SingleGoalDensePush(SingleGoalSparsePush):
|
|||||||
return -self.goals[0].euc_dist(obs)
|
return -self.goals[0].euc_dist(obs)
|
||||||
|
|
||||||
|
|
||||||
class SingleGoalSparseFall(SingleGoalSparseEMaze):
|
class SingleGoalSparseFall(SingleGoalSparseUMaze):
|
||||||
def sample_goals(self, scale: float) -> None:
|
def __init__(self, scale: float) -> None:
|
||||||
goal = MazeGoal(np.array([0.0, 3.375 * scale, 4.5]))
|
super().__init__(scale)
|
||||||
self.goals = [goal]
|
self.goals = [MazeGoal(np.array([0.0, 3.375 * scale, 4.5]))]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_maze() -> List[List[MazeCell]]:
|
def create_maze() -> List[List[MazeCell]]:
|
||||||
@ -129,9 +138,49 @@ class SingleGoalDenseFall(SingleGoalSparseFall):
|
|||||||
return -self.goals[0].euc_dist(obs)
|
return -self.goals[0].euc_dist(obs)
|
||||||
|
|
||||||
|
|
||||||
|
class SingleGoalSparse4Rooms(MazeTask):
|
||||||
|
REWARD_THRESHOLD: float = 0.9
|
||||||
|
|
||||||
|
def __init__(self, scale: float) -> None:
|
||||||
|
super().__init__(scale)
|
||||||
|
self.goals = [MazeGoal(np.array([6.0 * scale, 6.0 * scale]))]
|
||||||
|
|
||||||
|
def reward(self, obs: np.ndarray) -> float:
|
||||||
|
for goal in self.goals:
|
||||||
|
if goal.neighbor(obs):
|
||||||
|
return goal.reward_scale
|
||||||
|
return -0.0001
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create_maze() -> List[List[MazeCell]]:
|
||||||
|
E, B, R = MazeCell.EMPTY, MazeCell.BLOCK, MazeCell.ROBOT
|
||||||
|
return [
|
||||||
|
[B, B, B, B, B, B, B, B, B],
|
||||||
|
[B, R, E, E, B, E, E, E, B],
|
||||||
|
[B, E, E, E, E, E, E, E, B],
|
||||||
|
[B, E, E, E, B, E, E, E, B],
|
||||||
|
[B, B, E, B, B, B, E, B, B],
|
||||||
|
[B, E, E, E, B, E, E, E, B],
|
||||||
|
[B, E, E, E, E, E, E, E, B],
|
||||||
|
[B, E, E, E, B, E, E, E, B],
|
||||||
|
[B, B, B, B, B, B, B, B, B],
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class SubGoalSparse4Rooms(SingleGoalSparse4Rooms):
|
||||||
|
def __init__(self, scale: float) -> None:
|
||||||
|
super().__init__(scale)
|
||||||
|
self.goals = [
|
||||||
|
MazeGoal(np.array([6.0 * scale, 6.0 * scale])),
|
||||||
|
MazeGoal(np.array([0.0 * scale, 6.0 * scale]), 0.5, GREEN),
|
||||||
|
MazeGoal(np.array([6.0 * scale, 0.0 * scale]), 0.5, GREEN),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class TaskRegistry:
|
class TaskRegistry:
|
||||||
REGISTRY: Dict[str, List[Type[MazeTask]]] = {
|
REGISTRY: Dict[str, List[Type[MazeTask]]] = {
|
||||||
"Maze": [SingleGoalDenseEMaze, SingleGoalSparseEMaze],
|
"Maze": [SingleGoalDenseUMaze, SingleGoalSparseUMaze],
|
||||||
"Push": [SingleGoalDensePush, SingleGoalSparsePush],
|
"Push": [SingleGoalDensePush, SingleGoalSparsePush],
|
||||||
"Fall": [SingleGoalDenseFall, SingleGoalSparseFall],
|
"Fall": [SingleGoalDenseFall, SingleGoalSparseFall],
|
||||||
|
"4Rooms": [SingleGoalSparse4Rooms, SubGoalSparse4Rooms],
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user