From 95250af31cc24b7daddf5a85abff31bf517fa76d Mon Sep 17 00:00:00 2001 From: Maximilian Huettenrauch Date: Fri, 12 Feb 2021 17:12:40 +0100 Subject: [PATCH] added viapoint reacher --- alr_envs/classic_control/viapoint_reacher.py | 313 +++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 alr_envs/classic_control/viapoint_reacher.py diff --git a/alr_envs/classic_control/viapoint_reacher.py b/alr_envs/classic_control/viapoint_reacher.py new file mode 100644 index 0000000..eeba84d --- /dev/null +++ b/alr_envs/classic_control/viapoint_reacher.py @@ -0,0 +1,313 @@ +import gym +import numpy as np +import matplotlib.pyplot as plt +from matplotlib import patches + + +def ccw(A, B, C): + return (C[1]-A[1]) * (B[0]-A[0]) - (B[1]-A[1]) * (C[0]-A[0]) > 1e-12 + + +# Return true if line segments AB and CD intersect +def intersect(A, B, C, D): + return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D) + + +class HoleReacher(gym.Env): + + def __init__(self, num_links, hole_x, hole_width, hole_depth, allow_self_collision=False, + allow_wall_collision=False, collision_penalty=1000): + self.hole_x = hole_x # x-position of center of hole + self.hole_width = hole_width # width of hole + self.hole_depth = hole_depth # depth of hole + self.num_links = num_links + self.link_lengths = np.ones((num_links, 1)) + self.bottom_center_of_hole = np.hstack([hole_x, -hole_depth]) + self.top_center_of_hole = np.hstack([hole_x, 0]) + self.left_wall_edge = np.hstack([hole_x - self.hole_width/2, 0]) + self.right_wall_edge = np.hstack([hole_x + self.hole_width / 2, 0]) + self.allow_self_collision = allow_self_collision + self.allow_wall_collision = allow_wall_collision + self.collision_penalty = collision_penalty + + self._joints = None + self._joint_angles = None + self._angle_velocity = None + self.start_pos = np.hstack([[np.pi/2], np.zeros(self.num_links - 1)]) + self.start_vel = np.zeros(self.num_links) + self.weight_matrix_scale = 50 # for the holereacher, the dmp weights become quite large compared to the values of the goal attractor. this scaling is to ensure they are on similar scale for the optimizer + + self.dt = 0.01 + self.time_limit = 2 + + action_bound = np.pi * np.ones((self.num_links,)) + state_bound = np.hstack([ + [np.pi] * self.num_links, # cos + [np.pi] * self.num_links, # sin + [np.inf] * self.num_links, # velocity + [np.inf] * 2, # x-y coordinates of target distance + [np.inf] # env steps, because reward start after n steps TODO: Maybe + ]) + self.action_space = gym.spaces.Box(low=-action_bound, high=action_bound, shape=action_bound.shape) + self.observation_space = gym.spaces.Box(low=-state_bound, high=state_bound, shape=state_bound.shape) + + self.fig = None + rect_1 = patches.Rectangle((-self.num_links, -1), + self.num_links + self.hole_x - self.hole_width / 2, 1, + fill=True, edgecolor='k', facecolor='k') + rect_2 = patches.Rectangle((self.hole_x + self.hole_width / 2, -1), + self.num_links - self.hole_x + self.hole_width / 2, 1, + fill=True, edgecolor='k', facecolor='k') + rect_3 = patches.Rectangle((self.hole_x - self.hole_width / 2, -1), self.hole_width, + 1 - self.hole_depth, + fill=True, edgecolor='k', facecolor='k') + + self.patches = [rect_1, rect_2, rect_3] + + @property + def end_effector(self): + return self._joints[self.num_links].T + + def reset(self): + self._joint_angles = self.start_pos + self._angle_velocity = self.start_vel + self._joints = np.zeros((self.num_links + 1, 2)) + self._update_joints() + self._steps = 0 + + return self._get_obs().copy() + + def step(self, action): + """ + a single step with an action in joint velocity space + """ + vel = action + acc = (vel - self._angle_velocity) / self.dt + self._angle_velocity = vel + self._joint_angles = self._joint_angles + self.dt * self._angle_velocity + + self._update_joints() + + # rew = self._reward() + + # compute reward directly in step function + + dist_reward = 0 + if not self._is_collided: + if self._steps == 180: + dist_reward = np.linalg.norm(self.end_effector - self.bottom_center_of_hole) + else: + dist_reward = np.linalg.norm(self.end_effector - self.bottom_center_of_hole) + + reward = - dist_reward ** 2 + + reward -= 1e-6 * np.sum(acc**2) + + if self._steps == 180: + reward -= 0.1 * np.sum(vel**2) ** 2 + + if self._is_collided: + reward -= self.collision_penalty + + info = {"is_collided": self._is_collided} + + self._steps += 1 + + done = self._steps * self.dt > self.time_limit or self._is_collided + + return self._get_obs().copy(), reward, done, info + + def _update_joints(self): + """ + update _joints to get new end effector position. The other links are only required for rendering. + Returns: + + """ + line_points_in_taskspace = self.get_forward_kinematics(num_points_per_link=20) + + self._joints[1:, 0] = self._joints[0, 0] + line_points_in_taskspace[:, -1, 0] + self._joints[1:, 1] = self._joints[0, 1] + line_points_in_taskspace[:, -1, 1] + + self_collision = False + wall_collision = False + + if not self.allow_self_collision: + self_collision = self.check_self_collision(line_points_in_taskspace) + if np.any(np.abs(self._joint_angles) > np.pi) and not self.allow_self_collision: + self_collision = True + + if not self.allow_wall_collision: + wall_collision = self.check_wall_collision(line_points_in_taskspace) + + self._is_collided = self_collision or wall_collision + + def _get_obs(self): + theta = self._joint_angles + return np.hstack([ + np.cos(theta), + np.sin(theta), + self._angle_velocity, + self.end_effector - self.bottom_center_of_hole, + self._steps + ]) + + # def _reward(self): + # dist_reward = 0 + # if not self._is_collided: + # if self._steps == 180: + # dist_reward = np.linalg.norm(self.end_effector - self.bottom_center_of_hole) + # else: + # dist_reward = np.linalg.norm(self.end_effector - self.bottom_center_of_hole) + # + # out = - dist_reward ** 2 + # + # return out + + def get_forward_kinematics(self, num_points_per_link=1): + theta = self._joint_angles[:, None] + + if num_points_per_link > 1: + intermediate_points = np.linspace(0, 1, num_points_per_link) + else: + intermediate_points = 1 + + accumulated_theta = np.cumsum(theta, axis=0) + + endeffector = np.zeros(shape=(self.num_links, num_points_per_link, 2)) + + x = np.cos(accumulated_theta) * self.link_lengths * intermediate_points + y = np.sin(accumulated_theta) * self.link_lengths * intermediate_points + + endeffector[0, :, 0] = x[0, :] + endeffector[0, :, 1] = y[0, :] + + for i in range(1, self.num_links): + endeffector[i, :, 0] = x[i, :] + endeffector[i - 1, -1, 0] + endeffector[i, :, 1] = y[i, :] + endeffector[i - 1, -1, 1] + + return np.squeeze(endeffector + self._joints[0, :]) + + def check_self_collision(self, line_points): + for i, line1 in enumerate(line_points): + for line2 in line_points[i+2:, :, :]: + # if line1 != line2: + if intersect(line1[0], line1[-1], line2[0], line2[-1]): + return True + return False + + def check_wall_collision(self, line_points): + + # all points that are before the hole in x + r, c = np.where(line_points[:, :, 0] < (self.hole_x - self.hole_width / 2)) + + # check if any of those points are below surface + nr_line_points_below_surface_before_hole = np.sum(line_points[r, c, 1] < 0) + + if nr_line_points_below_surface_before_hole > 0: + return True + + # all points that are after the hole in x + r, c = np.where(line_points[:, :, 0] > (self.hole_x + self.hole_width / 2)) + + # check if any of those points are below surface + nr_line_points_below_surface_after_hole = np.sum(line_points[r, c, 1] < 0) + + if nr_line_points_below_surface_after_hole > 0: + return True + + # all points that are above the hole + r, c = np.where((line_points[:, :, 0] > (self.hole_x - self.hole_width / 2)) & ( + line_points[:, :, 0] < (self.hole_x + self.hole_width / 2))) + + # check if any of those points are below surface + nr_line_points_below_surface_in_hole = np.sum(line_points[r, c, 1] < -self.hole_depth) + + if nr_line_points_below_surface_in_hole > 0: + return True + + return False + + def render(self, mode='human'): + if self.fig is None: + self.fig = plt.figure() + # plt.ion() + # plt.pause(0.01) + else: + plt.figure(self.fig.number) + + if mode == "human": + plt.cla() + plt.title(f"Iteration: {self._steps}, distance: {self.end_effector - self.bottom_center_of_hole}") + + # Arm + plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') + + # Add the patch to the Axes + [plt.gca().add_patch(rect) for rect in self.patches] + + lim = np.sum(self.link_lengths) + 0.5 + plt.xlim([-lim, lim]) + plt.ylim([-1.1, lim]) + # plt.draw() + plt.pause(1e-4) # pushes window to foreground, which is annoying. + # self.fig.canvas.flush_events() + + elif mode == "partial": + if self._steps == 1: + # fig, ax = plt.subplots() + # Add the patch to the Axes + [plt.gca().add_patch(rect) for rect in self.patches] + # plt.pause(0.01) + + if self._steps % 20 == 0 or self._steps in [1, 199] or self._is_collided: + # Arm + plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k', alpha=self._steps / 200) + # ax.plot(line_points_in_taskspace[:, 0, 0], + # line_points_in_taskspace[:, 0, 1], + # line_points_in_taskspace[:, -1, 0], + # line_points_in_taskspace[:, -1, 1], marker='o', color='k', alpha=t / 200) + + lim = np.sum(self.link_lengths) + 0.5 + plt.xlim([-lim, lim]) + plt.ylim([-1.1, lim]) + plt.pause(0.01) + + elif mode == "final": + if self._steps == 199 or self._is_collided: + # fig, ax = plt.subplots() + + # Add the patch to the Axes + [plt.gca().add_patch(rect) for rect in self.patches] + + plt.xlim(-self.num_links, self.num_links), plt.ylim(-1, self.num_links) + # Arm + plt.plot(self._joints[:, 0], self._joints[:, 1], 'ro-', markerfacecolor='k') + + plt.pause(0.01) + + def close(self): + if self.fig is not None: + plt.close(self.fig) + + +if __name__ == '__main__': + nl = 5 + render_mode = "human" # "human" or "partial" or "final" + env = HoleReacher(num_links=nl, allow_self_collision=False, allow_wall_collision=False, hole_width=0.15, hole_depth=1, hole_x=1) + env.reset() + # env.render(mode=render_mode) + + for i in range(200): + # objective.load_result("/tmp/cma") + # test with random actions + ac = 2 * env.action_space.sample() + # ac[0] += np.pi/2 + obs, rew, d, info = env.step(ac) + env.render(mode=render_mode) + + print(rew) + + if d: + break + + env.close()