More precise obs space for point mazes
This commit is contained in:
parent
c9ebb1e2c7
commit
083d6f8dc8
@ -2,7 +2,6 @@ import gym
|
||||
|
||||
from mujoco_maze.maze_task import TaskRegistry
|
||||
|
||||
|
||||
MAZE_IDS = ["Maze", "Push", "Fall", "4Rooms"] # TODO: Block, BlockMaze
|
||||
|
||||
|
||||
|
@ -1,10 +1,10 @@
|
||||
"""Common API definition for Ant and Point.
|
||||
"""
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import numpy as np
|
||||
from gym.envs.mujoco.mujoco_env import MujocoEnv
|
||||
from gym.utils import EzPickle
|
||||
from mujoco_py import MjSimState
|
||||
import numpy as np
|
||||
|
||||
|
||||
class AgentModel(ABC, MujocoEnv, EzPickle):
|
||||
@ -15,15 +15,6 @@ class AgentModel(ABC, MujocoEnv, EzPickle):
|
||||
MujocoEnv.__init__(self, file_path, frame_skip)
|
||||
EzPickle.__init__(self)
|
||||
|
||||
def set_state_without_forward(self, qpos, qvel):
|
||||
assert qpos.shape == (self.model.nq,) and qvel.shape == (self.model.nv,)
|
||||
old_state = self.sim.get_state()
|
||||
new_state = MjSimState(
|
||||
old_state.time, qpos, qvel, old_state.act, old_state.udd_state
|
||||
)
|
||||
self.sim.set_state(new_state)
|
||||
self.sim.forward()
|
||||
|
||||
@abstractmethod
|
||||
def _get_obs(self) -> np.ndarray:
|
||||
"""Returns the observation from the model.
|
||||
|
@ -16,6 +16,8 @@
|
||||
"""Wrapper for creating the ant environment in gym_mujoco."""
|
||||
|
||||
import math
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from mujoco_maze.agent_model import AgentModel
|
||||
@ -37,39 +39,23 @@ class AntEnv(AgentModel):
|
||||
FILE = "ant.xml"
|
||||
ORI_IND = 3
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path=None,
|
||||
expose_all_qpos=True,
|
||||
expose_body_coms=None,
|
||||
expose_body_comvels=None,
|
||||
):
|
||||
self._expose_all_qpos = expose_all_qpos
|
||||
self._expose_body_coms = expose_body_coms
|
||||
self._expose_body_comvels = expose_body_comvels
|
||||
self._body_com_indices = {}
|
||||
self._body_comvel_indices = {}
|
||||
|
||||
def __init__(self, file_path: Optional[str] = None) -> None:
|
||||
super().__init__(file_path, 5)
|
||||
|
||||
def _step(self, a):
|
||||
return self.step(a)
|
||||
|
||||
def step(self, a):
|
||||
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
|
||||
xposbefore = self.get_body_com("torso")[0]
|
||||
self.do_simulation(a, self.frame_skip)
|
||||
self.do_simulation(action, self.frame_skip)
|
||||
xposafter = self.get_body_com("torso")[0]
|
||||
forward_reward = (xposafter - xposbefore) / self.dt
|
||||
ctrl_cost = 0.5 * np.square(a).sum()
|
||||
ctrl_cost = 0.5 * np.square(action).sum()
|
||||
survive_reward = 1.0
|
||||
reward = forward_reward - ctrl_cost + survive_reward
|
||||
_ = self.state_vector()
|
||||
done = False
|
||||
ob = self._get_obs()
|
||||
return (
|
||||
ob,
|
||||
reward,
|
||||
done,
|
||||
False,
|
||||
dict(
|
||||
reward_forward=forward_reward,
|
||||
reward_ctrl=-ctrl_cost,
|
||||
@ -79,34 +65,12 @@ class AntEnv(AgentModel):
|
||||
|
||||
def _get_obs(self):
|
||||
# No cfrc observation
|
||||
if self._expose_all_qpos:
|
||||
obs = np.concatenate(
|
||||
[
|
||||
self.sim.data.qpos.flat[:15], # Ensures only ant obs.
|
||||
self.sim.data.qvel.flat[:14],
|
||||
]
|
||||
)
|
||||
else:
|
||||
obs = np.concatenate(
|
||||
[self.sim.data.qpos.flat[2:15], self.sim.data.qvel.flat[:14],]
|
||||
)
|
||||
|
||||
if self._expose_body_coms is not None:
|
||||
for name in self._expose_body_coms:
|
||||
com = self.get_body_com(name)
|
||||
if name not in self._body_com_indices:
|
||||
indices = range(len(obs), len(obs) + len(com))
|
||||
self._body_com_indices[name] = indices
|
||||
obs = np.concatenate([obs, com])
|
||||
|
||||
if self._expose_body_comvels is not None:
|
||||
for name in self._expose_body_comvels:
|
||||
comvel = self.get_body_comvel(name)
|
||||
if name not in self._body_comvel_indices:
|
||||
indices = range(len(obs), len(obs) + len(comvel))
|
||||
self._body_comvel_indices[name] = indices
|
||||
obs = np.concatenate([obs, comvel])
|
||||
return obs
|
||||
return np.concatenate(
|
||||
[
|
||||
self.sim.data.qpos.flat[:15], # Ensures only ant obs.
|
||||
self.sim.data.qvel.flat[:14],
|
||||
]
|
||||
)
|
||||
|
||||
def reset_model(self):
|
||||
qpos = self.init_qpos + self.np_random.uniform(
|
||||
@ -137,7 +101,7 @@ class AntEnv(AgentModel):
|
||||
qpos[1] = xy[1]
|
||||
|
||||
qvel = self.sim.data.qvel
|
||||
self.set_state_without_forwarding(qpos, qvel)
|
||||
self.set_state(qpos, qvel)
|
||||
|
||||
def get_xy(self):
|
||||
return np.copy(self.sim.data.qpos[:2])
|
||||
|
@ -13,8 +13,8 @@
|
||||
# limitations under the License.
|
||||
# ==============================================================================
|
||||
|
||||
from mujoco_maze.maze_env import MazeEnv
|
||||
from mujoco_maze.ant import AntEnv
|
||||
from mujoco_maze.maze_env import MazeEnv
|
||||
|
||||
|
||||
class AntMazeEnv(MazeEnv):
|
||||
|
@ -16,17 +16,16 @@
|
||||
"""Adapted from rllab maze_env.py."""
|
||||
|
||||
import itertools as it
|
||||
import numpy as np
|
||||
import gym
|
||||
import os
|
||||
import tempfile
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Tuple, Type
|
||||
|
||||
from typing import Type
|
||||
import gym
|
||||
import numpy as np
|
||||
|
||||
from mujoco_maze import maze_env_utils, maze_task
|
||||
from mujoco_maze.agent_model import AgentModel
|
||||
from mujoco_maze import maze_env_utils
|
||||
from mujoco_maze import maze_task
|
||||
|
||||
# Directory that contains mujoco xml files.
|
||||
MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) + "/assets"
|
||||
@ -34,9 +33,7 @@ MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) + "/assets"
|
||||
|
||||
class MazeEnv(gym.Env):
|
||||
MODEL_CLASS: Type[AgentModel] = AgentModel
|
||||
|
||||
MANUAL_COLLISION: bool = False
|
||||
BLOCK_EPS: float = 0.0001
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -116,7 +113,7 @@ class MazeEnv(gym.Env):
|
||||
x = j * size_scaling - torso_x
|
||||
y = i * size_scaling - torso_y
|
||||
h = height / 2 * size_scaling
|
||||
size = 0.5 * size_scaling + self.BLOCK_EPS
|
||||
size = 0.5 * size_scaling
|
||||
ET.SubElement(
|
||||
worldbody,
|
||||
"geom",
|
||||
@ -135,7 +132,7 @@ class MazeEnv(gym.Env):
|
||||
x = j * size_scaling - torso_x
|
||||
y = i * size_scaling - torso_y
|
||||
h = height / 2 * size_scaling
|
||||
size = 0.5 * size_scaling + self.BLOCK_EPS
|
||||
size = 0.5 * size_scaling
|
||||
ET.SubElement(
|
||||
worldbody,
|
||||
"geom",
|
||||
@ -165,7 +162,7 @@ class MazeEnv(gym.Env):
|
||||
)
|
||||
y = i * size_scaling - torso_y
|
||||
h = height / 2 * size_scaling * height_shrink
|
||||
size = 0.5 * size_scaling * shrink + self.BLOCK_EPS
|
||||
size = 0.5 * size_scaling * shrink
|
||||
movable_body = ET.SubElement(
|
||||
worldbody,
|
||||
"body",
|
||||
@ -264,10 +261,38 @@ class MazeEnv(gym.Env):
|
||||
tree.write(file_path)
|
||||
self.world_tree = tree
|
||||
self.wrapped_env = self.MODEL_CLASS(*args, file_path=file_path, **kwargs)
|
||||
self.observation_space = self._get_obs_space()
|
||||
|
||||
def get_ori(self):
|
||||
def get_ori(self) -> float:
|
||||
return self.wrapped_env.get_ori()
|
||||
|
||||
def _get_obs_space(self) -> gym.spaces.Box:
|
||||
shape = self._get_obs().shape
|
||||
high = np.inf * np.ones(shape)
|
||||
low = -high
|
||||
# Set velocity limits
|
||||
wrapped_obs_space = self.wrapped_env.observation_space
|
||||
high[: wrapped_obs_space.shape[0]] = wrapped_obs_space.high
|
||||
low[: wrapped_obs_space.shape[0]] = wrapped_obs_space.low
|
||||
# Set coordinate limits
|
||||
low[0], high[0], low[1], high[1] = self._xy_limits()
|
||||
# Set orientation limits
|
||||
return gym.spaces.Box(low, high)
|
||||
|
||||
def _xy_limits(self) -> Tuple[float, float, float, float]:
|
||||
xmin, ymin, xmax, ymax = 100, 100, -100, -100
|
||||
structure = self._maze_structure
|
||||
for i, j in it.product(range(len(structure)), range(len(structure[0]))):
|
||||
if structure[i][j].is_block():
|
||||
continue
|
||||
xmin, xmax = min(xmin, j), max(xmax, j)
|
||||
ymin, ymax = min(ymin, i), max(ymax, i)
|
||||
x0, y0 = self._init_torso_x, self._init_torso_y
|
||||
scaling = self._maze_size_scaling
|
||||
xmin, xmax = (xmin - 0.5) * scaling - x0, (xmax + 0.5) * scaling - x0
|
||||
ymin, ymax = (ymin - 0.5) * scaling - y0, (ymax + 0.5) * scaling - y0
|
||||
return xmin, xmax, ymin, ymax
|
||||
|
||||
def get_top_down_view(self):
|
||||
self._view = np.zeros_like(self._view)
|
||||
|
||||
@ -492,13 +517,6 @@ class MazeEnv(gym.Env):
|
||||
def render(self, *args, **kwargs):
|
||||
return self.wrapped_env.render(*args, **kwargs)
|
||||
|
||||
@property
|
||||
def observation_space(self):
|
||||
shape = self._get_obs().shape
|
||||
high = np.inf * np.ones(shape)
|
||||
low = -high
|
||||
return gym.spaces.Box(low, high)
|
||||
|
||||
@property
|
||||
def action_space(self):
|
||||
return self.wrapped_env.action_space
|
||||
@ -531,6 +549,7 @@ class MazeEnv(gym.Env):
|
||||
else:
|
||||
inner_next_obs, inner_reward, _, info = self.wrapped_env.step(action)
|
||||
next_obs = self._get_obs()
|
||||
inner_reward = self._task.scale_inner_reward(inner_reward)
|
||||
outer_reward = self._task.reward(next_obs)
|
||||
done = self._task.termination(next_obs)
|
||||
return next_obs, inner_reward + outer_reward, done, info
|
||||
|
@ -14,9 +14,10 @@
|
||||
# ==============================================================================
|
||||
|
||||
"""Adapted from rllab maze_env_utils.py."""
|
||||
from enum import Enum
|
||||
import itertools as it
|
||||
import math
|
||||
from enum import Enum
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@ -112,7 +113,7 @@ class Collision:
|
||||
max_x = x_base + size_scaling * offset(pos, 3)
|
||||
self.objects.append((min_y, max_y, min_x, max_x))
|
||||
|
||||
def is_in(self, old_pos, new_pos) -> bool:
|
||||
def is_in(self, old_pos: np.ndarray, new_pos: np.ndarray) -> bool:
|
||||
# Heuristics to prevent the agent from going through the wall
|
||||
for x, y in ((old_pos + new_pos) / 2, new_pos):
|
||||
for min_y, max_y, min_x, max_x in self.objects:
|
||||
|
@ -9,6 +9,7 @@ Rgb = Tuple[float, float, float]
|
||||
|
||||
RED = (0.7, 0.1, 0.1)
|
||||
GREEN = (0.1, 0.7, 0.1)
|
||||
BLUE = (0.1, 0.1, 0.7)
|
||||
|
||||
|
||||
class MazeGoal:
|
||||
@ -50,6 +51,9 @@ class MazeTask(ABC):
|
||||
return True
|
||||
return False
|
||||
|
||||
def scale_inner_reward(self, inner_reward: float) -> float:
|
||||
return inner_reward
|
||||
|
||||
@abstractmethod
|
||||
def reward(self, obs: np.ndarray) -> float:
|
||||
pass
|
||||
|
@ -16,54 +16,53 @@
|
||||
"""Wrapper for creating the ant environment in gym_mujoco."""
|
||||
|
||||
import math
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import gym
|
||||
import numpy as np
|
||||
|
||||
from mujoco_maze.agent_model import AgentModel
|
||||
|
||||
|
||||
class PointEnv(AgentModel):
|
||||
VELOCITY_LIMITS: float = 100.0
|
||||
FILE = "point.xml"
|
||||
ORI_IND = 2
|
||||
|
||||
def __init__(self, file_path=None, expose_all_qpos=True):
|
||||
self._expose_all_qpos = expose_all_qpos
|
||||
def __init__(self, file_path: Optional[str] = None):
|
||||
super().__init__(file_path, 1)
|
||||
high = np.inf * np.ones(6)
|
||||
high[3:] = self.VELOCITY_LIMITS
|
||||
high[self.ORI_IND] = np.pi
|
||||
low = -high
|
||||
self.observation_space = gym.spaces.Box(low, high)
|
||||
|
||||
def _step(self, a):
|
||||
return self.step(a)
|
||||
|
||||
def step(self, action):
|
||||
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
|
||||
qpos = np.copy(self.sim.data.qpos)
|
||||
qpos[2] += action[1]
|
||||
# Clip orientation
|
||||
if qpos[2] < -np.pi:
|
||||
qpos[2] += np.pi * 2
|
||||
elif np.pi < qpos[2]:
|
||||
qpos[2] -= np.pi * 2
|
||||
ori = qpos[2]
|
||||
# compute increment in each direction
|
||||
dx = math.cos(ori) * action[0]
|
||||
dy = math.sin(ori) * action[0]
|
||||
# ensure that the robot is within reasonable range
|
||||
qpos[0] = np.clip(qpos[0] + dx, -100, 100)
|
||||
qpos[1] = np.clip(qpos[1] + dy, -100, 100)
|
||||
qvel = self.sim.data.qvel
|
||||
# Compute increment in each direction
|
||||
qpos[0] += math.cos(ori) * action[0]
|
||||
qpos[1] += math.sin(ori) * action[0]
|
||||
qvel = np.clip(self.sim.data.qvel, -self.VELOCITY_LIMITS, self.VELOCITY_LIMITS)
|
||||
self.set_state(qpos, qvel)
|
||||
for _ in range(0, self.frame_skip):
|
||||
self.sim.step()
|
||||
next_obs = self._get_obs()
|
||||
reward = 0
|
||||
done = False
|
||||
info = {}
|
||||
return next_obs, reward, done, info
|
||||
return next_obs, 0.0, False, {}
|
||||
|
||||
def _get_obs(self):
|
||||
if self._expose_all_qpos:
|
||||
return np.concatenate(
|
||||
[
|
||||
self.sim.data.qpos.flat[:3], # Only point-relevant coords.
|
||||
self.sim.data.qvel.flat[:3],
|
||||
]
|
||||
)
|
||||
else:
|
||||
return np.concatenate(
|
||||
[self.sim.data.qpos.flat[2:3], self.sim.data.qvel.flat[:3]]
|
||||
)
|
||||
return np.concatenate(
|
||||
[
|
||||
self.sim.data.qpos.flat[:3], # Only point-relevant coords.
|
||||
self.sim.data.qvel.flat[:3],
|
||||
]
|
||||
)
|
||||
|
||||
def reset_model(self):
|
||||
qpos = self.init_qpos + self.np_random.uniform(
|
||||
@ -86,7 +85,7 @@ class PointEnv(AgentModel):
|
||||
qpos[1] = xy[1]
|
||||
|
||||
qvel = self.sim.data.qvel
|
||||
self.set_state_without_forward(qpos, qvel)
|
||||
self.set_state(qpos, qvel)
|
||||
|
||||
def get_ori(self):
|
||||
return self.sim.data.qpos[self.ORI_IND]
|
||||
|
@ -1,7 +1,8 @@
|
||||
import gym
|
||||
import mujoco_maze
|
||||
import pytest
|
||||
|
||||
import mujoco_maze
|
||||
|
||||
|
||||
@pytest.mark.parametrize("maze_id", mujoco_maze.MAZE_IDS)
|
||||
def test_ant_maze(maze_id):
|
||||
|
Loading…
Reference in New Issue
Block a user