initial commit

This commit is contained in:
Dominik Moritz Roth 2022-06-19 15:01:30 +02:00
commit 65cd0516cd
6 changed files with 505 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
__pychache__
*.pyc
*.pyo

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# Project Columbus
Project Columbus is a framework for trivial 2D OpenAI Gym environments that are supposed to test a agents ability to solve tasks that require different forms of exploration effectively and efficiently.

195
entities.py Normal file
View File

@ -0,0 +1,195 @@
import pygame
import math
class Entity(object):
def __init__(self, env):
self.env = env
self.pos = (env.random(), env.random())
self.speed = (0, 0)
self.acc = (0, 0)
self.drag = 0
self.radius = 10
self.col = (255, 255, 255)
self.shape = 'circle'
def physics_step(self):
x, y = self.pos
vx, vy = self.speed
ax, ay = self.acc
vx, vy = vx+ax*self.env.acc_fac, vy+ay*self.env.acc_fac
x, y = x+vx*self.env.speed_fac, y+vy*self.env.speed_fac
if x > 1 or x < 0:
x = min(max(x, 0), 1)
vx = 0
if y > 1 or y < 0:
y = min(max(y, 0), 1)
vy = 0
self.speed = vx/(1+self.drag), vy/(1+self.drag)
self.pos = x, y
def controll_step(self):
pass
def step(self):
self.controll_step()
self.physics_step()
def draw(self):
x, y = self.pos
pygame.draw.circle(self.env.surface, self.col,
(x*self.env.width, y*self.env.height), self.radius, width=0)
def on_collision(self, other):
pass
def kill(self):
self.env.kill_entity(self)
class Agent(Entity):
def __init__(self, env):
super(Agent, self).__init__(env)
self.pos = (0.5, 0.5)
self.col = (0, 0, 255)
self.drag = self.env.agent_drag
self.controll_type = self.env.controll_type
def controll_step(self):
self._read_input()
self.env.check_collisions_for(self)
def _read_input(self):
if self.controll_type == 'SPEED':
self.speed = self.env.inp[0] - 0.5, self.env.inp[1] - 0.5
elif self.controll_type == 'ACC':
self.acc = self.env.inp[0] - 0.5, self.env.inp[1] - 0.5
else:
raise Exception('Unsupported controll_type')
class Enemy(Entity):
def __init__(self, env):
super(Enemy, self).__init__(env)
self.col = (255, 0, 0)
self.damage = 10
def on_collision(self, other):
if isinstance(other, Agent):
self.env.new_reward -= self.damage
class Barrier(Enemy):
def __init__(self, env):
super(Barrier, self).__init__(env)
class CircleBarrier(Barrier):
def __init__(self, env):
super(CircleBarrier, self).__init__(env)
class Chaser(Enemy):
def __init__(self, env):
super(Chaser, self).__init__(env)
self.target = self.env.agent
self.arrow_fak = 100
self.lookahead = 0
def _get_arrow(self):
tx, ty = self.target.pos
x, y = self.pos
fx, fy = x + self.speed[0]*self.lookahead*self.env.speed_fac, y + \
self.speed[1]*self.lookahead*self.env.speed_fac
dx, dy = (tx-fx)*self.arrow_fak, (ty-fy)*self.arrow_fak
return self.env._limit_to_unit_circle((dx, dy))
class WalkingChaser(Chaser):
def __init__(self, env):
super(WalkingChaser, self).__init__(env)
self.col = (255, 0, 0)
self.chase_speed = 0.45
def controll_step(self):
arrow = self._get_arrow()
self.speed = arrow[0] * self.chase_speed, arrow[1] * self.chase_speed
class FlyingChaser(Chaser):
def __init__(self, env):
super(FlyingChaser, self).__init__(env)
self.col = (255, 0, 0)
self.chase_acc = 0.5
self.arrow_fak = 5
self.lookahead = 8 + env.random()*2
def controll_step(self):
arrow = self._get_arrow()
self.acc = arrow[0] * self.chase_acc, arrow[1] * self.chase_acc
class Reward(Entity):
def __init__(self, env):
super(Reward, self).__init__(env)
self.col = (0, 255, 0)
self.avaible = True
self.enforce_not_on_barrier = False
self.reward = 1
def on_collision(self, other):
if isinstance(other, Agent):
self.on_collect()
elif isinstance(other, Barrier):
self.on_barrier_collision()
def on_collect(self):
self.env.new_reward += self.reward
def on_barrier_collision(self):
if self.enforce_not_on_barrier:
self.pos = (self.env.random(), self.env.random())
self.env.check_collisions_for(self)
class OnceReward(Reward):
def __init__(self, env):
super(OnceReward, self).__init__(env)
self.reward = 100
def on_collect(self):
self.env.new_abs_reward += self.reward
self.kill()
class TeleportingReward(OnceReward):
def __init__(self, env):
super(TeleportingReward, self).__init__(env)
self.enforce_not_on_barrier = True
self.env.check_collisions_for(self)
def on_collect(self):
self.env.new_abs_reward += self.reward
self.pos = (self.env.random(), self.env.random())
self.env.check_collisions_for(self)
class TimeoutReward(OnceReward):
def __init__(self, env):
super(TimeoutReward, self).__init__(env)
self.enforce_not_on_barrier = True
self.env.check_collisions_for(self)
self.timeout = 10
def set_avaible(self, value):
self.avaible = value
if self.avaible:
self.col = (0, 255, 0)
else:
self.col = (50, 100, 50)
def on_collect(self):
if self.avaible:
self.env.new_abs_reward += self.reward
self.set_avaible(False)
self.env.timers.append((self.timeout, self.set_avaible, True))

192
env.py Normal file
View File

@ -0,0 +1,192 @@
import gym
from gym import spaces
import numpy as np
import pygame
import random as random_dont_use
import math
import entities
import observables
class Base2DExpEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, observable=observables.Observable(), fps=60, env_seed=3.1):
super(Base2DExpEnv, self).__init__()
self.action_space = spaces.Box(
low=0, high=1, shape=(2,), dtype=np.float32)
observable._set_env(self)
self.observable = observable
self.observation_space = self.observable.get_observation_space()
self.title = 'Untitled'
self.fps = fps
self.env_seed = env_seed
self.joystick_offset = (10, 10)
self.surface = None
self.screen = None
self.width = 720
self.height = 720
self.speed_fac = 0.01/fps*60
self.acc_fac = 0.03/fps*60
self.agent_drag = 0 # 0.01 is a good value
self.controll_type = 'SPEED' # one of SPEED, ACC
self.limit_inp_to_unit_circle = True
self.aux_reward_max = 0 # 0 = off
self.aux_reward_discretize = 0 # 0 = dont discretize
self.draw_observable = True
self.draw_joystick = True
self.rng = random_dont_use.Random()
self.reset()
def _seed(self, seed):
self.rng.seed(seed)
def random(self):
return self.rng.random()
def _ensure_surface(self):
if not self.surface:
self.surface = pygame.Surface((self.width, self.height))
self.screen = pygame.display.set_mode((self.width, self.height))
pygame.display.set_caption(self.title)
def _limit_to_unit_circle(self, coords):
l_sq = coords[0]**2 + coords[1]**2
if l_sq > 1:
l = math.sqrt(l_sq)
coords = coords[0] / l, coords[1] / l
return coords
def _step_entities(self):
for entity in self.entities:
entity.step()
def _step_timers(self):
new_timers = []
for time_left, func, arg in self.timers:
time_left -= 1/self.fps
if time_left < 0:
func(arg)
else:
new_timers.append((time_left, func, arg))
self.timers = new_timers
def sq_dist(self, entity1, entity2):
return (entity1.pos[0] - entity2.pos[0])**2 + (entity1.pos[1] - entity2.pos[1])**2
def dist(self, entity1, entity2):
return math.sqrt(self._sq_dist(entity1, entity2))
def _get_aux_reward(self):
aux_reward = 0
for entity in self.entities:
if isinstance(entity, entities.Reward):
if entity.avaible:
reward = self.aux_reward_max / \
(1 + self.sq_dist(entity, self.agent))
if self.aux_reward_discretize:
reward = int(reward*self.aux_reward_discretize*2) / \
self.aux_reward_discretize / 2
aux_reward += reward
return aux_reward
def step(self, action):
inp = action[0], action[1]
if self.limit_inp_to_unit_circle:
inp = self._limit_to_unit_circle(((inp[0]-0.5)*2, (inp[1]-0.5)*2))
inp = (inp[0]+1)/2, (inp[1]+1)/2
self.inp = inp
self._step_timers()
self._step_entities()
observation = self.observable.get_observation()
reward, self.new_reward, self.new_abs_reward = self.new_reward / \
self.fps + self.new_abs_reward, 0, 0
self.score += reward # aux_reward does not count towards the score
if self.aux_reward_max:
reward += self._get_aux_reward()
return observation, reward, 0, self.score
return observation, reward, done, info
def check_collisions_for(self, entity):
for other in self.entities:
if other != entity:
sq_dist = ((other.pos[0]-entity.pos[0])*self.width) ** 2 \
+ ((other.pos[1]-entity.pos[1])*self.height)**2
if sq_dist < (entity.radius + other.radius)**2:
entity.on_collision(other)
other.on_collision(entity)
def kill_entity(self, target):
newEntities = []
for entity in self.entities:
if target != entity:
newEntities.append(entity)
else:
del target
break
self.entities = newEntities
def setup(self):
for i in range(16):
enemy = entities.CircleBarrier(self)
enemy.radius = self.random()*40+50
self.entities.append(enemy)
for i in range(3):
enemy = entities.FlyingChaser(self)
enemy.chase_acc = self.random()*0.4*0.3 # *0.6+0.5
self.entities.append(enemy)
for i in range(0):
reward = entities.TimeoutReward(self)
self.entities.append(reward)
for i in range(1):
reward = entities.TeleportingReward(self)
self.entities.append(reward)
def reset(self):
pygame.init()
self.inp = (0.5, 0.5)
# will get rescaled acording to fps (=reward per second)
self.new_reward = 0
self.new_abs_reward = 0 # will not get rescaled. should be used for one-time rewards
self.score = 0
self.entities = []
self.timers = []
self.agent = entities.Agent(self)
self.setup()
self.entities.append(self.agent) # add it last, will be drawn on top
self._seed(self.env_seed)
return 0
return observation # reward, done, info can't be included
def _draw_entities(self):
for entity in self.entities:
entity.draw()
def _draw_observable(self, forceDraw=False):
if self.draw_observable or forceDraw:
self.observable.draw()
def _draw_joystick(self, forceDraw=False):
if self.draw_joystick:
x, y = self.inp
pygame.draw.circle(self.screen, (100, 100, 100), (50 +
self.joystick_offset[0], 50+self.joystick_offset[1]), 50, width=1)
pygame.draw.circle(self.screen, (100, 100, 100), (20+int(60*x) +
self.joystick_offset[0], 20+int(60*y)+self.joystick_offset[1]), 20, width=0)
def render(self, mode='human'):
self._ensure_surface()
pygame.draw.rect(self.surface, (0, 0, 0),
pygame.Rect(0, 0, self.width, self.height))
self._draw_entities()
self.screen.blit(self.surface, (0, 0))
self._draw_observable()
self._draw_joystick()
pygame.display.update()
def close(self):
pygame.display.quit()
pygame.quit()

43
humanPlayer.py Normal file
View File

@ -0,0 +1,43 @@
from time import sleep, time
from env import Base2DExpEnv
import numpy as np
import pygame
from observables import Observable, CnnObservable
def main():
env = Base2DExpEnv(fps=60, observable=CnnObservable())
playEnv(env)
env.close()
def playEnv(env):
env.reset()
done = False
while not done:
t1 = time()
env.render()
pos = (0.5, 0.5)
for event in pygame.event.get():
pass
# if event.type == pygame.MOUSEBUTTONDOWN:
# pos = pygame.mouse.get_pos()
# print(pos)
pos = pygame.mouse.get_pos()
pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1),
min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1))
obs, rew, done, info = env.step(np.array(pos, dtype=np.float32))
print('Reward: '+str(rew))
print('Score: '+str(info))
t2 = time()
dt = t2 - t1
delay = (1/env.fps - dt)
if delay < 0:
print("[!] Can't keep framerate!")
else:
sleep(delay)
if __name__ == '__main__':
main()

69
observables.py Normal file
View File

@ -0,0 +1,69 @@
from gym import spaces
import numpy as np
import pygame
class Observable():
def __init__(self):
self.obs = None
pass
def get_observation_space():
print("[!] Using dummyObservable. Env won't output anything")
return spaces.Box(low=0, high=255,
shape=(1,), dtype=np.uint8)
class CnnObservable(Observable):
def __init__(self, in_width=256, in_height=256, out_width=32, out_height=32, draw_width=128, draw_height=128, smooth_scaling=True):
super(CnnObservable, self).__init__()
self.in_width = in_width
self.in_height = in_height
self.out_width = out_width
self.out_height = out_height
self.draw_width = draw_width
self.draw_height = draw_height
if smooth_scaling:
self.scaler = pygame.transform.smoothscale
else:
self.scaler = pygame.transform.scale
def _set_env(self, env):
self.env = env
def get_observation_space(self):
return spaces.Box(low=0, high=255,
shape=(self.out_width, self.out_height), dtype=np.uint8)
def get_observation(self):
x, y = self.env.agent.pos[0]*self.env.width - self.in_width / \
2, self.env.agent.pos[1]*self.env.height - self.in_height/2
w, h = self.in_width, self.in_height
cx, cy = _clip(x, 0, self.env.width), _clip(
y, 0, self.env.height)
cw, ch = _clip(w, 0, self.env.width - cx), _clip(h,
0, self.env.height - cy)
rect = pygame.Rect(cx, cy, cw, ch)
snap = self.env.surface.subsurface(rect)
self.snap = pygame.Surface((self.in_width, self.in_height))
pygame.draw.rect(self.snap, (50, 50, 50),
pygame.Rect(0, 0, self.in_width, self.in_height))
self.snap.blit(snap, (cx - x, cy - y))
self.obs = self.scaler(
self.snap, (self.out_width, self.out_height))
return self.obs
def draw(self):
if not self.obs:
self.get_observation()
big = pygame.transform.scale(
self.obs, (self.draw_width, self.draw_height))
x, y = self.env.width - self.draw_width - 10, 10
pygame.draw.rect(self.env.screen, (50, 50, 50),
pygame.Rect(x - 1, y - 1, self.draw_width + 2, self.draw_height + 2))
self.env.screen.blit(
big, (x, y))
def _clip(num, lower, upper):
return min(max(num, lower), upper)