wip
This commit is contained in:
		
							parent
							
								
									36bf9b5b6a
								
							
						
					
					
						commit
						b4ad3e6ddd
					
				@ -129,6 +129,25 @@ register(
 | 
			
		||||
    }
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
register(
 | 
			
		||||
    id='EpisodicSimpleReacher-v0',
 | 
			
		||||
    entry_point='alr_envs.classic_control:EpisodicSimpleReacherEnv',
 | 
			
		||||
    max_episode_steps=200,
 | 
			
		||||
    kwargs={
 | 
			
		||||
        "n_links": 2,
 | 
			
		||||
    }
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
register(
 | 
			
		||||
    id='EpisodicSimpleReacher-v1',
 | 
			
		||||
    entry_point='alr_envs.classic_control:EpisodicSimpleReacherEnv',
 | 
			
		||||
    max_episode_steps=200,
 | 
			
		||||
    kwargs={
 | 
			
		||||
        "n_links": 2,
 | 
			
		||||
        "random_start": False
 | 
			
		||||
    }
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
register(
 | 
			
		||||
    id='LongSimpleReacher-v0',
 | 
			
		||||
    entry_point='alr_envs.classic_control:SimpleReacherEnv',
 | 
			
		||||
@ -186,7 +205,7 @@ register(
 | 
			
		||||
    entry_point='alr_envs.utils.make_env_helpers:make_dmp_env',
 | 
			
		||||
    # max_episode_steps=1,
 | 
			
		||||
    kwargs={
 | 
			
		||||
        "name": "alr_envs:SimpleReacher-v0",
 | 
			
		||||
        "name": "alr_envs:EpisodicSimpleReacher-v0",
 | 
			
		||||
        "num_dof": 2,
 | 
			
		||||
        "num_basis": 5,
 | 
			
		||||
        "duration": 2,
 | 
			
		||||
@ -202,7 +221,7 @@ register(
 | 
			
		||||
    entry_point='alr_envs.utils.make_env_helpers:make_dmp_env',
 | 
			
		||||
    # max_episode_steps=1,
 | 
			
		||||
    kwargs={
 | 
			
		||||
        "name": "alr_envs:SimpleReacher-v1",
 | 
			
		||||
        "name": "alr_envs:EpisodicSimpleReacher-v1",
 | 
			
		||||
        "num_dof": 2,
 | 
			
		||||
        "num_basis": 5,
 | 
			
		||||
        "duration": 2,
 | 
			
		||||
 | 
			
		||||
@ -1,3 +1,4 @@
 | 
			
		||||
from alr_envs.classic_control.simple_reacher import SimpleReacherEnv
 | 
			
		||||
from alr_envs.classic_control.episodic_simple_reacher import EpisodicSimpleReacherEnv
 | 
			
		||||
from alr_envs.classic_control.viapoint_reacher import ViaPointReacher
 | 
			
		||||
from alr_envs.classic_control.hole_reacher import HoleReacher
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										46
									
								
								alr_envs/classic_control/episodic_simple_reacher.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								alr_envs/classic_control/episodic_simple_reacher.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,46 @@
 | 
			
		||||
from alr_envs.classic_control.simple_reacher import SimpleReacherEnv
 | 
			
		||||
from gym import spaces
 | 
			
		||||
import numpy as np
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class EpisodicSimpleReacherEnv(SimpleReacherEnv):
 | 
			
		||||
    def __init__(self, n_links, random_start=True):
 | 
			
		||||
        super(EpisodicSimpleReacherEnv, self).__init__(n_links, random_start)
 | 
			
		||||
 | 
			
		||||
        # self._goal_pos = None
 | 
			
		||||
 | 
			
		||||
        if random_start:
 | 
			
		||||
            state_bound = np.hstack([
 | 
			
		||||
                [np.pi] * self.n_links,  # cos
 | 
			
		||||
                [np.pi] * self.n_links,  # sin
 | 
			
		||||
                [np.inf] * self.n_links,  # velocity
 | 
			
		||||
            ])
 | 
			
		||||
        else:
 | 
			
		||||
            state_bound = np.empty(0, )
 | 
			
		||||
 | 
			
		||||
        state_bound = np.hstack([
 | 
			
		||||
            state_bound,
 | 
			
		||||
            [np.inf] * 2,  # x-y coordinates of goal
 | 
			
		||||
        ])
 | 
			
		||||
 | 
			
		||||
        self.observation_space = spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def start_pos(self):
 | 
			
		||||
        return self._start_pos
 | 
			
		||||
 | 
			
		||||
    # @property
 | 
			
		||||
    # def goal_pos(self):
 | 
			
		||||
    #     return self._goal_pos
 | 
			
		||||
 | 
			
		||||
    def _get_obs(self):
 | 
			
		||||
        if self.random_start:
 | 
			
		||||
            theta = self._joint_angle
 | 
			
		||||
            return np.hstack([
 | 
			
		||||
                np.cos(theta),
 | 
			
		||||
                np.sin(theta),
 | 
			
		||||
                self._angle_velocity,
 | 
			
		||||
                self._goal,
 | 
			
		||||
            ])
 | 
			
		||||
        else:
 | 
			
		||||
            return self._goal
 | 
			
		||||
@ -26,7 +26,7 @@ class SimpleReacherEnv(gym.Env):
 | 
			
		||||
 | 
			
		||||
        self.random_start = random_start
 | 
			
		||||
 | 
			
		||||
        self._goal_pos = None
 | 
			
		||||
        self._goal = None
 | 
			
		||||
 | 
			
		||||
        self._joints = None
 | 
			
		||||
        self._joint_angle = None
 | 
			
		||||
@ -53,10 +53,6 @@ class SimpleReacherEnv(gym.Env):
 | 
			
		||||
        self._steps = 0
 | 
			
		||||
        self.seed()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def start_pos(self):
 | 
			
		||||
        return self._start_pos
 | 
			
		||||
 | 
			
		||||
    def step(self, action: np.ndarray):
 | 
			
		||||
 | 
			
		||||
        # action = self._add_action_noise(action)
 | 
			
		||||
@ -91,8 +87,7 @@ class SimpleReacherEnv(gym.Env):
 | 
			
		||||
            np.cos(theta),
 | 
			
		||||
            np.sin(theta),
 | 
			
		||||
            self._angle_velocity,
 | 
			
		||||
            self.end_effector - self._goal_pos,
 | 
			
		||||
            self._goal_pos,
 | 
			
		||||
            self.end_effector - self._goal,
 | 
			
		||||
            self._steps
 | 
			
		||||
        ])
 | 
			
		||||
 | 
			
		||||
@ -107,7 +102,7 @@ class SimpleReacherEnv(gym.Env):
 | 
			
		||||
        self._joints[1:] = self._joints[0] + np.cumsum(x.T, axis=0)
 | 
			
		||||
 | 
			
		||||
    def _get_reward(self, action: np.ndarray):
 | 
			
		||||
        diff = self.end_effector - self._goal_pos
 | 
			
		||||
        diff = self.end_effector - self._goal
 | 
			
		||||
        reward_dist = 0
 | 
			
		||||
 | 
			
		||||
        # TODO: Is this the best option
 | 
			
		||||
@ -135,7 +130,7 @@ class SimpleReacherEnv(gym.Env):
 | 
			
		||||
        self._update_joints()
 | 
			
		||||
        self._steps = 0
 | 
			
		||||
 | 
			
		||||
        self._goal_pos = self._get_random_goal()
 | 
			
		||||
        self._goal = self._get_random_goal()
 | 
			
		||||
        return self._get_obs().copy()
 | 
			
		||||
 | 
			
		||||
    def _get_random_goal(self):
 | 
			
		||||
@ -160,13 +155,13 @@ class SimpleReacherEnv(gym.Env):
 | 
			
		||||
            plt.figure(self.fig.number)
 | 
			
		||||
 | 
			
		||||
        plt.cla()
 | 
			
		||||
        plt.title(f"Iteration: {self._steps}, distance: {self.end_effector - self._goal_pos}")
 | 
			
		||||
        plt.title(f"Iteration: {self._steps}, distance: {self.end_effector - self._goal}")
 | 
			
		||||
 | 
			
		||||
        # Arm
 | 
			
		||||
        plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k')
 | 
			
		||||
 | 
			
		||||
        # goal
 | 
			
		||||
        goal_pos = self._goal_pos.T
 | 
			
		||||
        goal_pos = self._goal.T
 | 
			
		||||
        plt.plot(goal_pos[0], goal_pos[1], 'gx')
 | 
			
		||||
        # distance between end effector and goal
 | 
			
		||||
        plt.plot([self.end_effector[0], goal_pos[0]], [self.end_effector[1], goal_pos[1]], 'g--')
 | 
			
		||||
 | 
			
		||||
@ -81,12 +81,11 @@ class AlrContextualMpEnvSampler:
 | 
			
		||||
        repeat = int(np.ceil(n_samples / self.env.num_envs))
 | 
			
		||||
        vals = defaultdict(list)
 | 
			
		||||
        for i in range(repeat):
 | 
			
		||||
            obs = self.env.reset()
 | 
			
		||||
            new_contexts = self.env.reset()
 | 
			
		||||
 | 
			
		||||
            new_contexts = obs[-2]
 | 
			
		||||
            new_samples = dist.sample(new_contexts)
 | 
			
		||||
 | 
			
		||||
            obs, reward, done, info = self.env.step(p)
 | 
			
		||||
            obs, reward, done, info = self.env.step(new_samples)
 | 
			
		||||
            vals['obs'].append(obs)
 | 
			
		||||
            vals['reward'].append(reward)
 | 
			
		||||
            vals['done'].append(done)
 | 
			
		||||
 | 
			
		||||
@ -9,8 +9,10 @@ from alr_envs.utils.wrapper.mp_wrapper import MPWrapper
 | 
			
		||||
 | 
			
		||||
class DmpWrapper(MPWrapper):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, env: gym.Env, num_dof: int, num_basis: int, start_pos: np.ndarray = None,
 | 
			
		||||
                 final_pos: np.ndarray = None, duration: int = 1, alpha_phase: float = 2., dt: float = None,
 | 
			
		||||
    def __init__(self, env: gym.Env, num_dof: int, num_basis: int,
 | 
			
		||||
                 # start_pos: np.ndarray = None,
 | 
			
		||||
                 # final_pos: np.ndarray = None,
 | 
			
		||||
                 duration: int = 1, alpha_phase: float = 2., dt: float = None,
 | 
			
		||||
                 learn_goal: bool = False, return_to_start: bool = False, post_traj_time: float = 0.,
 | 
			
		||||
                 weights_scale: float = 1., goal_scale: float = 1., bandwidth_factor: float = 3.,
 | 
			
		||||
                 policy_type: str = None, render_mode: str = None):
 | 
			
		||||
@ -35,26 +37,30 @@ class DmpWrapper(MPWrapper):
 | 
			
		||||
        self.learn_goal = learn_goal
 | 
			
		||||
        dt = env.dt if hasattr(env, "dt") else dt
 | 
			
		||||
        assert dt is not None
 | 
			
		||||
        start_pos = start_pos if start_pos is not None else env.start_pos if hasattr(env, "start_pos") else None
 | 
			
		||||
        # start_pos = start_pos if start_pos is not None else env.start_pos if hasattr(env, "start_pos") else None
 | 
			
		||||
        # TODO: assert start_pos is not None  # start_pos will be set in initialize, do we need this here?
 | 
			
		||||
        if learn_goal:
 | 
			
		||||
        # if learn_goal:
 | 
			
		||||
            # final_pos = np.zeros_like(start_pos)  # arbitrary, will be learned
 | 
			
		||||
            final_pos = np.zeros((1, num_dof))  # arbitrary, will be learned
 | 
			
		||||
        else:
 | 
			
		||||
            final_pos = final_pos if final_pos is not None else start_pos if return_to_start else None
 | 
			
		||||
        assert final_pos is not None
 | 
			
		||||
            # final_pos = np.zeros((1, num_dof))  # arbitrary, will be learned
 | 
			
		||||
        # else:
 | 
			
		||||
        #     final_pos = final_pos if final_pos is not None else start_pos if return_to_start else None
 | 
			
		||||
        # assert final_pos is not None
 | 
			
		||||
        self.t = np.linspace(0, duration, int(duration / dt))
 | 
			
		||||
        self.goal_scale = goal_scale
 | 
			
		||||
 | 
			
		||||
        super().__init__(env, num_dof, duration, dt, post_traj_time, policy_type, weights_scale, render_mode,
 | 
			
		||||
                         num_basis=num_basis, start_pos=start_pos, final_pos=final_pos, alpha_phase=alpha_phase,
 | 
			
		||||
                         num_basis=num_basis,
 | 
			
		||||
                         # start_pos=start_pos, final_pos=final_pos,
 | 
			
		||||
                         alpha_phase=alpha_phase,
 | 
			
		||||
                         bandwidth_factor=bandwidth_factor)
 | 
			
		||||
 | 
			
		||||
        action_bounds = np.inf * np.ones((np.prod(self.mp.dmp_weights.shape) + (num_dof if learn_goal else 0)))
 | 
			
		||||
        self.action_space = gym.spaces.Box(low=-action_bounds, high=action_bounds, dtype=np.float32)
 | 
			
		||||
 | 
			
		||||
    def initialize_mp(self, num_dof: int, duration: int, dt: float, num_basis: int = 5, start_pos: np.ndarray = None,
 | 
			
		||||
                      final_pos: np.ndarray = None, alpha_phase: float = 2., bandwidth_factor: float = 3.):
 | 
			
		||||
    def initialize_mp(self, num_dof: int, duration: int, dt: float, num_basis: int = 5,
 | 
			
		||||
                      # start_pos: np.ndarray = None,
 | 
			
		||||
                      # final_pos: np.ndarray = None,
 | 
			
		||||
                      alpha_phase: float = 2., bandwidth_factor: float = 3.):
 | 
			
		||||
 | 
			
		||||
        phase_generator = ExpDecayPhaseGenerator(alpha_phase=alpha_phase, duration=duration)
 | 
			
		||||
        basis_generator = DMPBasisGenerator(phase_generator, duration=duration, num_basis=num_basis,
 | 
			
		||||
@ -66,12 +72,12 @@ class DmpWrapper(MPWrapper):
 | 
			
		||||
        # dmp.dmp_start_pos = start_pos.reshape((1, num_dof))
 | 
			
		||||
        # in a contextual environment, the start_pos may be not fixed, set in mp_rollout?
 | 
			
		||||
        # TODO: Should we set start_pos in init at all? It's only used after calling rollout anyway...
 | 
			
		||||
        dmp.dmp_start_pos = start_pos.reshape((1, num_dof)) if start_pos is not None else np.zeros((1, num_dof))
 | 
			
		||||
        # dmp.dmp_start_pos = start_pos.reshape((1, num_dof)) if start_pos is not None else np.zeros((1, num_dof))
 | 
			
		||||
 | 
			
		||||
        weights = np.zeros((num_basis, num_dof))
 | 
			
		||||
        goal_pos = np.zeros(num_dof) if self.learn_goal else final_pos
 | 
			
		||||
        # weights = np.zeros((num_basis, num_dof))
 | 
			
		||||
        # goal_pos = np.zeros(num_dof) if self.learn_goal else final_pos
 | 
			
		||||
 | 
			
		||||
        dmp.set_weights(weights, goal_pos)
 | 
			
		||||
        # dmp.set_weights(weights, goal_pos)
 | 
			
		||||
        return dmp
 | 
			
		||||
 | 
			
		||||
    def goal_and_weights(self, params):
 | 
			
		||||
@ -83,7 +89,7 @@ class DmpWrapper(MPWrapper):
 | 
			
		||||
            params = params[:, :-self.mp.num_dimensions]  # [1,num_dof]
 | 
			
		||||
            # weight_matrix = np.reshape(params[:, :-self.num_dof], [self.num_basis, self.num_dof])
 | 
			
		||||
        else:
 | 
			
		||||
            goal_pos = self.mp.dmp_goal_pos.flatten()
 | 
			
		||||
            goal_pos = self.env.goal_pos  # self.mp.dmp_goal_pos.flatten()
 | 
			
		||||
            assert goal_pos is not None
 | 
			
		||||
            # weight_matrix = np.reshape(params, [self.num_basis, self.num_dof])
 | 
			
		||||
 | 
			
		||||
@ -91,8 +97,8 @@ class DmpWrapper(MPWrapper):
 | 
			
		||||
        return goal_pos * self.goal_scale, weight_matrix * self.weights_scale
 | 
			
		||||
 | 
			
		||||
    def mp_rollout(self, action):
 | 
			
		||||
        if self.mp.start_pos is None:
 | 
			
		||||
            self.mp.start_pos = self.env.start_pos
 | 
			
		||||
        # if self.mp.start_pos is None:
 | 
			
		||||
        self.mp.dmp_start_pos = self.env.init_qpos  # start_pos
 | 
			
		||||
        goal_pos, weight_matrix = self.goal_and_weights(action)
 | 
			
		||||
        self.mp.set_weights(weight_matrix, goal_pos)
 | 
			
		||||
        return self.mp.reference_trajectory(self.t)
 | 
			
		||||
 | 
			
		||||
@ -62,7 +62,8 @@ class MPWrapper(gym.Wrapper, ABC):
 | 
			
		||||
        self.env.configure(context)
 | 
			
		||||
 | 
			
		||||
    def reset(self):
 | 
			
		||||
        return self.env.reset()
 | 
			
		||||
        obs = self.env.reset()
 | 
			
		||||
        return obs
 | 
			
		||||
 | 
			
		||||
    def step(self, action: np.ndarray):
 | 
			
		||||
        """ This function generates a trajectory based on a DMP and then does the usual loop over reset and step"""
 | 
			
		||||
 | 
			
		||||
@ -83,5 +83,6 @@ if __name__ == '__main__':
 | 
			
		||||
    # example_mujoco()
 | 
			
		||||
    # example_dmp()
 | 
			
		||||
    # example_async()
 | 
			
		||||
    env = gym.make("alr_envs:HoleReacherDMP-v0", context=0.1)
 | 
			
		||||
    # env = gym.make("alr_envs:HoleReacherDMP-v0", context=0.1)
 | 
			
		||||
    env = gym.make("alr_envs:SimpleReacherDMP-v1")
 | 
			
		||||
    print()
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user