From c9ebb1e2c7f4b96a9bfde30d5961533d0289140b Mon Sep 17 00:00:00 2001 From: kngwyu Date: Tue, 23 Jun 2020 01:13:05 +0900 Subject: [PATCH] Implement 4Rooms --- mujoco_maze/__init__.py | 2 +- mujoco_maze/maze_env.py | 27 ++++++++- mujoco_maze/maze_task.py | 119 +++++++++++++++++++++++++++------------ 3 files changed, 109 insertions(+), 39 deletions(-) diff --git a/mujoco_maze/__init__.py b/mujoco_maze/__init__.py index b4b6713..2275a06 100644 --- a/mujoco_maze/__init__.py +++ b/mujoco_maze/__init__.py @@ -3,7 +3,7 @@ import gym from mujoco_maze.maze_task import TaskRegistry -MAZE_IDS = ["Maze", "Push", "Fall"] # TODO: Block, BlockMaze +MAZE_IDS = ["Maze", "Push", "Fall", "4Rooms"] # TODO: Block, BlockMaze def _get_kwargs(maze_id: str) -> tuple: diff --git a/mujoco_maze/maze_env.py b/mujoco_maze/maze_env.py index 4fa5dab..df3a00f 100644 --- a/mujoco_maze/maze_env.py +++ b/mujoco_maze/maze_env.py @@ -40,7 +40,7 @@ class MazeEnv(gym.Env): def __init__( self, - maze_task: Type[maze_task.MazeTask] = maze_task.SingleGoalSparseEMaze(), + maze_task: Type[maze_task.MazeTask] = maze_task.SingleGoalSparseUMaze, n_bins: int = 0, sensor_range: float = 3.0, sensor_span: float = 2 * np.pi, @@ -52,7 +52,7 @@ class MazeEnv(gym.Env): *args, **kwargs, ) -> None: - self._task = maze_task() + self._task = maze_task(maze_size_scaling) xml_path = os.path.join(MODEL_DIR, self.MODEL_CLASS.FILE) tree = ET.parse(xml_path) @@ -246,8 +246,23 @@ class MazeEnv(gym.Env): if "name" not in geom.attrib: raise Exception("Every geom of the torso must have a name " "defined") + # Set goals + asset = tree.find(".//asset") + for i, goal in enumerate(self._task.goals): + ET.SubElement(asset, "material", name=f"goal{i}", rgba=goal.rbga_str()) + z = goal.pos[2] if goal.dim >= 3 else 0.0 + ET.SubElement( + worldbody, + "site", + name=f"goal_site{i}", + pos=f"{goal.pos[0]} {goal.pos[1]} {z}", + size=f"{maze_size_scaling * 0.1}", + material=f"goal{i}", + ) + _, file_path = tempfile.mkstemp(text=True, suffix=".xml") tree.write(file_path) + self.world_tree = tree self.wrapped_env = self.MODEL_CLASS(*args, file_path=file_path, **kwargs) def get_ori(self): @@ -458,12 +473,18 @@ class MazeEnv(gym.Env): self.t = 0 self.wrapped_env.reset() # Sample a new goal - self._task.sample_goals(self._maze_size_scaling) + if self._task.sample_goals(): + self.set_marker() if len(self._init_positions) > 1: xy = np.random.choice(self._init_positions) self.wrapped_env.set_xy(xy) return self._get_obs() + def set_marker(self): + for i, goal in enumerate(self._task.goals): + idx = self.model.site_name2id(f"goal{i}") + self.data.site_xpos[idx][: len(goal.pos)] = goal.pos + @property def viewer(self): return self.wrapped_env.viewer diff --git a/mujoco_maze/maze_task.py b/mujoco_maze/maze_task.py index de859a1..cd1dc22 100644 --- a/mujoco_maze/maze_task.py +++ b/mujoco_maze/maze_task.py @@ -1,65 +1,74 @@ from abc import ABC, abstractmethod -from typing import Dict, List, Type +from typing import Dict, List, Tuple, Type import numpy as np from mujoco_maze.maze_env_utils import MazeCell +Rgb = Tuple[float, float, float] + +RED = (0.7, 0.1, 0.1) +GREEN = (0.1, 0.7, 0.1) + class MazeGoal: THRESHOLD: float = 0.6 - def __init__(self, goal: np.ndarray, reward_scale: float = 1.0) -> None: - self.goal = goal - self.goal_dim = goal.shape[0] + def __init__( + self, pos: np.ndarray, reward_scale: float = 1.0, rgb: Rgb = RED + ) -> None: + assert 0.0 <= reward_scale <= 1.0 + self.pos = pos + self.dim = pos.shape[0] self.reward_scale = reward_scale + self.rgb = rgb + + def rbga_str(self) -> str: + r, g, b = self.rgb + return f"{r} {g} {b} 1" def neighbor(self, obs: np.ndarray) -> float: - return np.linalg.norm(obs[: self.goal_dim] - self.goal) <= self.THRESHOLD + return np.linalg.norm(obs[: self.dim] - self.pos) <= self.THRESHOLD def euc_dist(self, obs: np.ndarray) -> float: - return np.sum(np.square(obs[: self.goal_dim] - self.goal)) ** 0.5 + return np.sum(np.square(obs[: self.dim] - self.pos)) ** 0.5 class MazeTask(ABC): REWARD_THRESHOLD: float - def __init__(self) -> None: + def __init__(self, scale: float) -> None: + self.scale = scale self.goals = [] - @abstractmethod - def sample_goals(self, scale: float) -> None: - pass + def sample_goals(self) -> bool: + return False + + def termination(self, obs: np.ndarray) -> bool: + for goal in self.goals: + if goal.neighbor(obs): + return True + return False @abstractmethod def reward(self, obs: np.ndarray) -> float: pass - @abstractmethod - def termination(self, obs: np.ndarray) -> bool: - pass - @staticmethod @abstractmethod def create_maze() -> List[List[MazeCell]]: pass -class SingleGoalSparseEMaze(MazeTask): +class SingleGoalSparseUMaze(MazeTask): REWARD_THRESHOLD: float = 0.9 - def sample_goals(self, scale: float) -> None: - goal = MazeGoal(np.array([0.0, 2.0 * scale])) - self.goals = [goal] + def __init__(self, scale: float) -> None: + super().__init__(scale) + self.goals = [MazeGoal(np.array([0.0, 2.0 * scale]))] def reward(self, obs: np.ndarray) -> float: - if self.goals[0].neighbor(obs): - return 1.0 - else: - return -0.0001 - - def termination(self, obs: np.ndarray) -> bool: - return self.goals[0].neighbor(obs) + return 1.0 if self.termination(obs) else -0.0001 @staticmethod def create_maze() -> List[List[MazeCell]]: @@ -73,17 +82,17 @@ class SingleGoalSparseEMaze(MazeTask): ] -class SingleGoalDenseEMaze(SingleGoalSparseEMaze): +class SingleGoalDenseUMaze(SingleGoalSparseUMaze): REWARD_THRESHOLD: float = 1000.0 def reward(self, obs: np.ndarray) -> float: return -self.goals[0].euc_dist(obs) -class SingleGoalSparsePush(SingleGoalSparseEMaze): - def sample_goals(self, scale: float) -> None: - goal = MazeGoal(np.array([0.0, 2.375 * scale])) - self.goals = [goal] +class SingleGoalSparsePush(SingleGoalSparseUMaze): + def __init__(self, scale: float) -> None: + super().__init__(scale) + self.goals = [MazeGoal(np.array([0.0, 2.375 * scale]))] @staticmethod def create_maze() -> List[List[MazeCell]]: @@ -104,10 +113,10 @@ class SingleGoalDensePush(SingleGoalSparsePush): return -self.goals[0].euc_dist(obs) -class SingleGoalSparseFall(SingleGoalSparseEMaze): - def sample_goals(self, scale: float) -> None: - goal = MazeGoal(np.array([0.0, 3.375 * scale, 4.5])) - self.goals = [goal] +class SingleGoalSparseFall(SingleGoalSparseUMaze): + def __init__(self, scale: float) -> None: + super().__init__(scale) + self.goals = [MazeGoal(np.array([0.0, 3.375 * scale, 4.5]))] @staticmethod def create_maze() -> List[List[MazeCell]]: @@ -129,9 +138,49 @@ class SingleGoalDenseFall(SingleGoalSparseFall): return -self.goals[0].euc_dist(obs) +class SingleGoalSparse4Rooms(MazeTask): + REWARD_THRESHOLD: float = 0.9 + + def __init__(self, scale: float) -> None: + super().__init__(scale) + self.goals = [MazeGoal(np.array([6.0 * scale, 6.0 * scale]))] + + def reward(self, obs: np.ndarray) -> float: + for goal in self.goals: + if goal.neighbor(obs): + return goal.reward_scale + return -0.0001 + + @staticmethod + def create_maze() -> List[List[MazeCell]]: + E, B, R = MazeCell.EMPTY, MazeCell.BLOCK, MazeCell.ROBOT + return [ + [B, B, B, B, B, B, B, B, B], + [B, R, E, E, B, E, E, E, B], + [B, E, E, E, E, E, E, E, B], + [B, E, E, E, B, E, E, E, B], + [B, B, E, B, B, B, E, B, B], + [B, E, E, E, B, E, E, E, B], + [B, E, E, E, E, E, E, E, B], + [B, E, E, E, B, E, E, E, B], + [B, B, B, B, B, B, B, B, B], + ] + + +class SubGoalSparse4Rooms(SingleGoalSparse4Rooms): + def __init__(self, scale: float) -> None: + super().__init__(scale) + self.goals = [ + MazeGoal(np.array([6.0 * scale, 6.0 * scale])), + MazeGoal(np.array([0.0 * scale, 6.0 * scale]), 0.5, GREEN), + MazeGoal(np.array([6.0 * scale, 0.0 * scale]), 0.5, GREEN), + ] + + class TaskRegistry: REGISTRY: Dict[str, List[Type[MazeTask]]] = { - "Maze": [SingleGoalDenseEMaze, SingleGoalSparseEMaze], + "Maze": [SingleGoalDenseUMaze, SingleGoalSparseUMaze], "Push": [SingleGoalDensePush, SingleGoalSparsePush], "Fall": [SingleGoalDenseFall, SingleGoalSparseFall], + "4Rooms": [SingleGoalSparse4Rooms, SubGoalSparse4Rooms], }