2021-01-11 16:08:42 +01:00
import gym
import numpy as np
2021-01-12 10:52:08 +01:00
import matplotlib
matplotlib . use ( ' TkAgg ' )
2021-01-11 16:08:42 +01:00
import matplotlib . pyplot as plt
from matplotlib import patches
def ccw ( A , B , C ) :
return ( C [ 1 ] - A [ 1 ] ) * ( B [ 0 ] - A [ 0 ] ) - ( B [ 1 ] - A [ 1 ] ) * ( C [ 0 ] - A [ 0 ] ) > 1e-12
# Return true if line segments AB and CD intersect
def intersect ( A , B , C , D ) :
return ccw ( A , C , D ) != ccw ( B , C , D ) and ccw ( A , B , C ) != ccw ( A , B , D )
class HoleReacher ( gym . Env ) :
def __init__ ( self , num_links , hole_x , hole_width , hole_depth , allow_self_collision = False ,
allow_wall_collision = False , collision_penalty = 1000 ) :
self . hole_x = hole_x # x-position of center of hole
self . hole_width = hole_width # width of hole
self . hole_depth = hole_depth # depth of hole
self . num_links = num_links
self . link_lengths = np . ones ( ( num_links , 1 ) )
self . bottom_center_of_hole = np . hstack ( [ hole_x , - hole_depth ] )
self . top_center_of_hole = np . hstack ( [ hole_x , 0 ] )
self . left_wall_edge = np . hstack ( [ hole_x - self . hole_width / 2 , 0 ] )
self . right_wall_edge = np . hstack ( [ hole_x + self . hole_width / 2 , 0 ] )
self . allow_self_collision = allow_self_collision
self . allow_wall_collision = allow_wall_collision
self . collision_penalty = collision_penalty
self . _joints = None
self . _joint_angles = None
self . _angle_velocity = None
self . start_pos = np . hstack ( [ [ np . pi / 2 ] , np . zeros ( self . num_links - 1 ) ] )
self . start_vel = np . zeros ( self . num_links )
2021-01-12 10:52:08 +01:00
self . weight_matrix_scale = 50 # for the holereacher, the dmp weights become quite large compared to the values of the goal attractor. this scaling is to ensure they are on similar scale for the optimizer
2021-01-11 16:08:42 +01:00
self . _dt = 0.01
action_bound = np . pi * np . ones ( ( self . num_links , ) )
state_bound = np . hstack ( [
[ np . pi ] * self . num_links , # cos
[ np . pi ] * self . num_links , # sin
[ np . inf ] * self . num_links , # velocity
[ np . inf ] * 2 , # x-y coordinates of target distance
[ np . inf ] # env steps, because reward start after n steps TODO: Maybe
] )
self . action_space = gym . spaces . Box ( low = - action_bound , high = action_bound , shape = action_bound . shape )
self . observation_space = gym . spaces . Box ( low = - state_bound , high = state_bound , shape = state_bound . shape )
self . fig = None
rect_1 = patches . Rectangle ( ( - self . num_links , - 1 ) ,
self . num_links + self . hole_x - self . hole_width / 2 , 1 ,
fill = True , edgecolor = ' k ' , facecolor = ' k ' )
rect_2 = patches . Rectangle ( ( self . hole_x + self . hole_width / 2 , - 1 ) ,
self . num_links - self . hole_x + self . hole_width / 2 , 1 ,
fill = True , edgecolor = ' k ' , facecolor = ' k ' )
rect_3 = patches . Rectangle ( ( self . hole_x - self . hole_width / 2 , - 1 ) , self . hole_width ,
1 - self . hole_depth ,
fill = True , edgecolor = ' k ' , facecolor = ' k ' )
self . patches = [ rect_1 , rect_2 , rect_3 ]
@property
def end_effector ( self ) :
return self . _joints [ self . num_links ] . T
2021-01-12 10:52:08 +01:00
def reset ( self ) :
self . _joint_angles = self . start_pos
self . _angle_velocity = self . start_vel
self . _joints = np . zeros ( ( self . num_links + 1 , 2 ) )
self . _update_joints ( )
self . _steps = 0
return self . _get_obs ( ) . copy ( )
2021-01-11 16:08:42 +01:00
2021-01-12 10:52:08 +01:00
def step ( self , action ) :
"""
a single step with an action in joint velocity space
"""
2021-01-11 16:08:42 +01:00
vel = action
acc = ( vel - self . _angle_velocity ) / self . _dt
self . _angle_velocity = vel
self . _joint_angles = self . _joint_angles + self . _dt * self . _angle_velocity
self . _update_joints ( )
2021-01-12 10:52:08 +01:00
# rew = self._reward()
# compute reward directly in step function
dist_reward = 0
if not self . _is_collided :
if self . _steps == 180 :
dist_reward = np . linalg . norm ( self . end_effector - self . bottom_center_of_hole )
else :
dist_reward = np . linalg . norm ( self . end_effector - self . bottom_center_of_hole )
reward = - dist_reward * * 2
2021-01-11 16:08:42 +01:00
2021-01-12 10:52:08 +01:00
reward - = 1e-6 * np . sum ( acc * * 2 )
2021-01-11 16:08:42 +01:00
if self . _steps == 180 :
2021-01-12 10:52:08 +01:00
reward - = ( 0.1 * np . sum ( vel * * 2 ) * * 2
+ 1e-3 * np . sum ( action * * 2 )
)
2021-01-11 16:08:42 +01:00
if self . _is_collided :
2021-01-12 10:52:08 +01:00
reward - = self . collision_penalty
2021-01-11 16:08:42 +01:00
info = { }
self . _steps + = 1
2021-01-12 10:52:08 +01:00
return self . _get_obs ( ) . copy ( ) , reward , self . _is_collided , info
2021-01-11 16:08:42 +01:00
def _update_joints ( self ) :
"""
update _joints to get new end effector position . The other links are only required for rendering .
Returns :
"""
line_points_in_taskspace = self . get_forward_kinematics ( num_points_per_link = 20 )
self . _joints [ 1 : , 0 ] = self . _joints [ 0 , 0 ] + line_points_in_taskspace [ : , - 1 , 0 ]
self . _joints [ 1 : , 1 ] = self . _joints [ 0 , 1 ] + line_points_in_taskspace [ : , - 1 , 1 ]
self_collision = False
wall_collision = False
if not self . allow_self_collision :
self_collision = self . check_self_collision ( line_points_in_taskspace )
if np . any ( np . abs ( self . _joint_angles ) > np . pi ) and not self . allow_self_collision :
self_collision = True
if not self . allow_wall_collision :
wall_collision = self . check_wall_collision ( line_points_in_taskspace )
self . _is_collided = self_collision or wall_collision
def _get_obs ( self ) :
theta = self . _joint_angles
return np . hstack ( [
np . cos ( theta ) ,
np . sin ( theta ) ,
self . _angle_velocity ,
self . end_effector - self . bottom_center_of_hole ,
self . _steps
] )
2021-01-12 10:52:08 +01:00
# def _reward(self):
# dist_reward = 0
# if not self._is_collided:
# if self._steps == 180:
# dist_reward = np.linalg.norm(self.end_effector - self.bottom_center_of_hole)
# else:
# dist_reward = np.linalg.norm(self.end_effector - self.bottom_center_of_hole)
#
# out = - dist_reward ** 2
#
# return out
2021-01-11 16:08:42 +01:00
def get_forward_kinematics ( self , num_points_per_link = 1 ) :
theta = self . _joint_angles [ : , None ]
if num_points_per_link > 1 :
intermediate_points = np . linspace ( 0 , 1 , num_points_per_link )
else :
intermediate_points = 1
accumulated_theta = np . cumsum ( theta , axis = 0 )
endeffector = np . zeros ( shape = ( self . num_links , num_points_per_link , 2 ) )
x = np . cos ( accumulated_theta ) * self . link_lengths * intermediate_points
y = np . sin ( accumulated_theta ) * self . link_lengths * intermediate_points
endeffector [ 0 , : , 0 ] = x [ 0 , : ]
endeffector [ 0 , : , 1 ] = y [ 0 , : ]
for i in range ( 1 , self . num_links ) :
endeffector [ i , : , 0 ] = x [ i , : ] + endeffector [ i - 1 , - 1 , 0 ]
endeffector [ i , : , 1 ] = y [ i , : ] + endeffector [ i - 1 , - 1 , 1 ]
return np . squeeze ( endeffector + self . _joints [ 0 , : ] )
def check_self_collision ( self , line_points ) :
for i , line1 in enumerate ( line_points ) :
for line2 in line_points [ i + 2 : , : , : ] :
# if line1 != line2:
if intersect ( line1 [ 0 ] , line1 [ - 1 ] , line2 [ 0 ] , line2 [ - 1 ] ) :
return True
return False
def check_wall_collision ( self , line_points ) :
# all points that are before the hole in x
r , c = np . where ( line_points [ : , : , 0 ] < ( self . hole_x - self . hole_width / 2 ) )
# check if any of those points are below surface
nr_line_points_below_surface_before_hole = np . sum ( line_points [ r , c , 1 ] < 0 )
if nr_line_points_below_surface_before_hole > 0 :
return True
# all points that are after the hole in x
r , c = np . where ( line_points [ : , : , 0 ] > ( self . hole_x + self . hole_width / 2 ) )
# check if any of those points are below surface
nr_line_points_below_surface_after_hole = np . sum ( line_points [ r , c , 1 ] < 0 )
if nr_line_points_below_surface_after_hole > 0 :
return True
# all points that are above the hole
r , c = np . where ( ( line_points [ : , : , 0 ] > ( self . hole_x - self . hole_width / 2 ) ) & (
line_points [ : , : , 0 ] < ( self . hole_x + self . hole_width / 2 ) ) )
# check if any of those points are below surface
nr_line_points_below_surface_in_hole = np . sum ( line_points [ r , c , 1 ] < - self . hole_depth )
if nr_line_points_below_surface_in_hole > 0 :
return True
return False
def render ( self , mode = ' human ' ) :
if self . fig is None :
self . fig = plt . figure ( )
2021-01-12 10:52:08 +01:00
# plt.ion()
# plt.pause(0.01)
2021-01-11 16:08:42 +01:00
else :
plt . figure ( self . fig . number )
if mode == " human " :
plt . cla ( )
plt . title ( f " Iteration: { self . _steps } , distance: { self . end_effector - self . bottom_center_of_hole } " )
# Arm
plt . plot ( self . _joints [ : , 0 ] , self . _joints [ : , 1 ] , ' ro- ' , markerfacecolor = ' k ' )
# Add the patch to the Axes
[ plt . gca ( ) . add_patch ( rect ) for rect in self . patches ]
lim = np . sum ( self . link_lengths ) + 0.5
plt . xlim ( [ - lim , lim ] )
plt . ylim ( [ - 1.1 , lim ] )
# plt.draw()
plt . pause ( 1e-4 ) # pushes window to foreground, which is annoying.
# self.fig.canvas.flush_events()
2021-01-12 10:52:08 +01:00
elif mode == " partial " :
if self . _steps == 1 :
# fig, ax = plt.subplots()
2021-01-11 16:08:42 +01:00
# Add the patch to the Axes
[ plt . gca ( ) . add_patch ( rect ) for rect in self . patches ]
2021-01-12 10:52:08 +01:00
# plt.pause(0.01)
2021-01-11 16:08:42 +01:00
2021-01-12 10:52:08 +01:00
if self . _steps % 20 == 0 or self . _steps in [ 1 , 199 ] or self . _is_collided :
# Arm
plt . plot ( self . _joints [ : , 0 ] , self . _joints [ : , 1 ] , ' ro- ' , markerfacecolor = ' k ' , alpha = self . _steps / 200 )
# ax.plot(line_points_in_taskspace[:, 0, 0],
# line_points_in_taskspace[:, 0, 1],
# line_points_in_taskspace[:, -1, 0],
# line_points_in_taskspace[:, -1, 1], marker='o', color='k', alpha=t / 200)
lim = np . sum ( self . link_lengths ) + 0.5
plt . xlim ( [ - lim , lim ] )
plt . ylim ( [ - 1.1 , lim ] )
2021-01-11 16:08:42 +01:00
plt . pause ( 0.01 )
2021-01-12 10:52:08 +01:00
elif mode == " final " :
if self . _steps == 199 or self . _is_collided :
# fig, ax = plt.subplots()
2021-01-11 16:08:42 +01:00
# Add the patch to the Axes
[ plt . gca ( ) . add_patch ( rect ) for rect in self . patches ]
plt . xlim ( - self . num_links , self . num_links ) , plt . ylim ( - 1 , self . num_links )
# Arm
plt . plot ( self . _joints [ : , 0 ] , self . _joints [ : , 1 ] , ' ro- ' , markerfacecolor = ' k ' )
plt . pause ( 0.01 )
if __name__ == ' __main__ ' :
nl = 5
2021-01-12 10:52:08 +01:00
render_mode = " human " # "human" or "partial" or "final"
2021-01-11 16:08:42 +01:00
env = HoleReacher ( num_links = nl , allow_self_collision = False , allow_wall_collision = False , hole_width = 0.15 , hole_depth = 1 , hole_x = 1 )
env . reset ( )
2021-01-12 10:52:08 +01:00
# env.render(mode=render_mode)
2021-01-11 16:08:42 +01:00
for i in range ( 200 ) :
# objective.load_result("/tmp/cma")
2021-01-12 10:52:08 +01:00
# test with random actions
ac = 2 * env . action_space . sample ( )
2021-01-11 16:08:42 +01:00
# ac[0] += np.pi/2
obs , rew , done , info = env . step ( ac )
2021-01-12 10:52:08 +01:00
env . render ( mode = render_mode )
2021-01-11 16:08:42 +01:00
print ( rew )
if done :
break