Merge commit 'fd6edb02f716fa7d40468101797de231adc20c00' as 'subtrees/columbus'

This commit is contained in:
Dominik Moritz Roth 2022-06-19 15:46:03 +02:00
commit 1a21bb9ad4
7 changed files with 528 additions and 0 deletions

3
subtrees/columbus/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
__pychache__
*.pyc
*.pyo

View File

@ -0,0 +1,17 @@
# Project Columbus
Project Columbus is a framework for trivial 2D OpenAI Gym environments that are supposed to test a agents ability to solve tasks that require different forms of exploration effectively and efficiently.
![Screenshot](./img_README.png)
### env.py
Contains the ColumbusEnv. New envs are implemented by subclassing ColumbusEnv and expanding _init_ and overriding _setup_.
### entities.py
Contains all implemented entities (e.g. the Agent, Rewards and Enemies)
### observables.py
Contains all 'oberservables'. These are attached to envs to define what kind of output is given to the agent. This way environments can be designed independently from the observation machanism that is used by the agent to play it.
### humanPlayer.py
Allows environments to be played by a human using mouse input.

View File

@ -0,0 +1,195 @@
import pygame
import math
class Entity(object):
def __init__(self, env):
self.env = env
self.pos = (env.random(), env.random())
self.speed = (0, 0)
self.acc = (0, 0)
self.drag = 0
self.radius = 10
self.col = (255, 255, 255)
self.shape = 'circle'
def physics_step(self):
x, y = self.pos
vx, vy = self.speed
ax, ay = self.acc
vx, vy = vx+ax*self.env.acc_fac, vy+ay*self.env.acc_fac
x, y = x+vx*self.env.speed_fac, y+vy*self.env.speed_fac
if x > 1 or x < 0:
x = min(max(x, 0), 1)
vx = 0
if y > 1 or y < 0:
y = min(max(y, 0), 1)
vy = 0
self.speed = vx/(1+self.drag), vy/(1+self.drag)
self.pos = x, y
def controll_step(self):
pass
def step(self):
self.controll_step()
self.physics_step()
def draw(self):
x, y = self.pos
pygame.draw.circle(self.env.surface, self.col,
(x*self.env.width, y*self.env.height), self.radius, width=0)
def on_collision(self, other):
pass
def kill(self):
self.env.kill_entity(self)
class Agent(Entity):
def __init__(self, env):
super(Agent, self).__init__(env)
self.pos = (0.5, 0.5)
self.col = (0, 0, 255)
self.drag = self.env.agent_drag
self.controll_type = self.env.controll_type
def controll_step(self):
self._read_input()
self.env.check_collisions_for(self)
def _read_input(self):
if self.controll_type == 'SPEED':
self.speed = self.env.inp[0] - 0.5, self.env.inp[1] - 0.5
elif self.controll_type == 'ACC':
self.acc = self.env.inp[0] - 0.5, self.env.inp[1] - 0.5
else:
raise Exception('Unsupported controll_type')
class Enemy(Entity):
def __init__(self, env):
super(Enemy, self).__init__(env)
self.col = (255, 0, 0)
self.damage = 10
def on_collision(self, other):
if isinstance(other, Agent):
self.env.new_reward -= self.damage
class Barrier(Enemy):
def __init__(self, env):
super(Barrier, self).__init__(env)
class CircleBarrier(Barrier):
def __init__(self, env):
super(CircleBarrier, self).__init__(env)
class Chaser(Enemy):
def __init__(self, env):
super(Chaser, self).__init__(env)
self.target = self.env.agent
self.arrow_fak = 100
self.lookahead = 0
def _get_arrow(self):
tx, ty = self.target.pos
x, y = self.pos
fx, fy = x + self.speed[0]*self.lookahead*self.env.speed_fac, y + \
self.speed[1]*self.lookahead*self.env.speed_fac
dx, dy = (tx-fx)*self.arrow_fak, (ty-fy)*self.arrow_fak
return self.env._limit_to_unit_circle((dx, dy))
class WalkingChaser(Chaser):
def __init__(self, env):
super(WalkingChaser, self).__init__(env)
self.col = (255, 0, 0)
self.chase_speed = 0.45
def controll_step(self):
arrow = self._get_arrow()
self.speed = arrow[0] * self.chase_speed, arrow[1] * self.chase_speed
class FlyingChaser(Chaser):
def __init__(self, env):
super(FlyingChaser, self).__init__(env)
self.col = (255, 0, 0)
self.chase_acc = 0.5
self.arrow_fak = 5
self.lookahead = 8 + env.random()*2
def controll_step(self):
arrow = self._get_arrow()
self.acc = arrow[0] * self.chase_acc, arrow[1] * self.chase_acc
class Reward(Entity):
def __init__(self, env):
super(Reward, self).__init__(env)
self.col = (0, 255, 0)
self.avaible = True
self.enforce_not_on_barrier = False
self.reward = 1
def on_collision(self, other):
if isinstance(other, Agent):
self.on_collect()
elif isinstance(other, Barrier):
self.on_barrier_collision()
def on_collect(self):
self.env.new_reward += self.reward
def on_barrier_collision(self):
if self.enforce_not_on_barrier:
self.pos = (self.env.random(), self.env.random())
self.env.check_collisions_for(self)
class OnceReward(Reward):
def __init__(self, env):
super(OnceReward, self).__init__(env)
self.reward = 100
def on_collect(self):
self.env.new_abs_reward += self.reward
self.kill()
class TeleportingReward(OnceReward):
def __init__(self, env):
super(TeleportingReward, self).__init__(env)
self.enforce_not_on_barrier = True
self.env.check_collisions_for(self)
def on_collect(self):
self.env.new_abs_reward += self.reward
self.pos = (self.env.random(), self.env.random())
self.env.check_collisions_for(self)
class TimeoutReward(OnceReward):
def __init__(self, env):
super(TimeoutReward, self).__init__(env)
self.enforce_not_on_barrier = True
self.env.check_collisions_for(self)
self.timeout = 10
def set_avaible(self, value):
self.avaible = value
if self.avaible:
self.col = (0, 255, 0)
else:
self.col = (50, 100, 50)
def on_collect(self):
if self.avaible:
self.env.new_abs_reward += self.reward
self.set_avaible(False)
self.env.timers.append((self.timeout, self.set_avaible, True))

201
subtrees/columbus/env.py Normal file
View File

@ -0,0 +1,201 @@
import gym
from gym import spaces
import numpy as np
import pygame
import random as random_dont_use
import math
import entities
import observables
class ColumbusEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, observable=observables.Observable(), fps=60, env_seed=3.1):
super(ColumbusEnv, self).__init__()
self.action_space = spaces.Box(
low=0, high=1, shape=(2,), dtype=np.float32)
observable._set_env(self)
self.observable = observable
self.observation_space = self.observable.get_observation_space()
self.title = 'Untitled'
self.fps = fps
self.env_seed = env_seed
self.joystick_offset = (10, 10)
self.surface = None
self.screen = None
self.width = 720
self.height = 720
self.speed_fac = 0.01/fps*60
self.acc_fac = 0.03/fps*60
self.agent_drag = 0 # 0.01 is a good value
self.controll_type = 'SPEED' # one of SPEED, ACC
self.limit_inp_to_unit_circle = True
self.aux_reward_max = 0 # 0 = off
self.aux_reward_discretize = 0 # 0 = dont discretize
self.draw_observable = True
self.draw_joystick = True
self.rng = random_dont_use.Random()
self.reset()
def _seed(self, seed):
self.rng.seed(seed)
def random(self):
return self.rng.random()
def _ensure_surface(self):
if not self.surface:
self.surface = pygame.Surface((self.width, self.height))
self.screen = pygame.display.set_mode((self.width, self.height))
pygame.display.set_caption(self.title)
def _limit_to_unit_circle(self, coords):
l_sq = coords[0]**2 + coords[1]**2
if l_sq > 1:
l = math.sqrt(l_sq)
coords = coords[0] / l, coords[1] / l
return coords
def _step_entities(self):
for entity in self.entities:
entity.step()
def _step_timers(self):
new_timers = []
for time_left, func, arg in self.timers:
time_left -= 1/self.fps
if time_left < 0:
func(arg)
else:
new_timers.append((time_left, func, arg))
self.timers = new_timers
def sq_dist(self, entity1, entity2):
return (entity1.pos[0] - entity2.pos[0])**2 + (entity1.pos[1] - entity2.pos[1])**2
def dist(self, entity1, entity2):
return math.sqrt(self._sq_dist(entity1, entity2))
def _get_aux_reward(self):
aux_reward = 0
for entity in self.entities:
if isinstance(entity, entities.Reward):
if entity.avaible:
reward = self.aux_reward_max / \
(1 + self.sq_dist(entity, self.agent))
if self.aux_reward_discretize:
reward = int(reward*self.aux_reward_discretize*2) / \
self.aux_reward_discretize / 2
aux_reward += reward
return aux_reward
def step(self, action):
inp = action[0], action[1]
if self.limit_inp_to_unit_circle:
inp = self._limit_to_unit_circle(((inp[0]-0.5)*2, (inp[1]-0.5)*2))
inp = (inp[0]+1)/2, (inp[1]+1)/2
self.inp = inp
self._step_timers()
self._step_entities()
observation = self.observable.get_observation()
reward, self.new_reward, self.new_abs_reward = self.new_reward / \
self.fps + self.new_abs_reward, 0, 0
self.score += reward # aux_reward does not count towards the score
if self.aux_reward_max:
reward += self._get_aux_reward()
return observation, reward, 0, self.score
return observation, reward, done, info
def check_collisions_for(self, entity):
for other in self.entities:
if other != entity:
if self._check_collision_between(entity, other):
entity.on_collision(other)
other.on_collision(entity)
def _check_collision_between(self, e1, e2):
shapes = [e1.shape, e2.shape]
shapes.sort()
if shapes == ['circle', 'circle']:
sq_dist = ((e1.pos[0]-e2.pos[0])*self.width) ** 2 \
+ ((e1.pos[1]-e2.pos[1])*self.height)**2
return sq_dist < (e1.radius + e2.radius)**2
else:
raise Exception(
'Checking for collision between unsupported shapes: '+str(shapes))
def kill_entity(self, target):
newEntities = []
for entity in self.entities:
if target != entity:
newEntities.append(entity)
else:
del target
break
self.entities = newEntities
def setup(self):
for i in range(18):
enemy = entities.CircleBarrier(self)
enemy.radius = self.random()*40+50
self.entities.append(enemy)
for i in range(3):
enemy = entities.FlyingChaser(self)
enemy.chase_acc = self.random()*0.4*0.3 # *0.6+0.5
self.entities.append(enemy)
for i in range(0):
reward = entities.TimeoutReward(self)
self.entities.append(reward)
for i in range(1):
reward = entities.TeleportingReward(self)
self.entities.append(reward)
def reset(self):
pygame.init()
self.inp = (0.5, 0.5)
# will get rescaled acording to fps (=reward per second)
self.new_reward = 0
self.new_abs_reward = 0 # will not get rescaled. should be used for one-time rewards
self.score = 0
self.entities = []
self.timers = []
self.agent = entities.Agent(self)
self.setup()
self.entities.append(self.agent) # add it last, will be drawn on top
self._seed(self.env_seed)
return 0
return observation # reward, done, info can't be included
def _draw_entities(self):
for entity in self.entities:
entity.draw()
def _draw_observable(self, forceDraw=False):
if self.draw_observable or forceDraw:
self.observable.draw()
def _draw_joystick(self, forceDraw=False):
if self.draw_joystick:
x, y = self.inp
pygame.draw.circle(self.screen, (100, 100, 100), (50 +
self.joystick_offset[0], 50+self.joystick_offset[1]), 50, width=1)
pygame.draw.circle(self.screen, (100, 100, 100), (20+int(60*x) +
self.joystick_offset[0], 20+int(60*y)+self.joystick_offset[1]), 20, width=0)
def render(self, mode='human'):
self._ensure_surface()
pygame.draw.rect(self.surface, (0, 0, 0),
pygame.Rect(0, 0, self.width, self.height))
self._draw_entities()
self.screen.blit(self.surface, (0, 0))
self._draw_observable()
self._draw_joystick()
pygame.display.update()
def close(self):
pygame.display.quit()
pygame.quit()

View File

@ -0,0 +1,43 @@
from time import sleep, time
from env import ColumbusEnv
import numpy as np
import pygame
from observables import Observable, CnnObservable
def main():
env = ColumbusEnv(fps=60, observable=CnnObservable())
playEnv(env)
env.close()
def playEnv(env):
env.reset()
done = False
while not done:
t1 = time()
env.render()
pos = (0.5, 0.5)
for event in pygame.event.get():
pass
# if event.type == pygame.MOUSEBUTTONDOWN:
# pos = pygame.mouse.get_pos()
# print(pos)
pos = pygame.mouse.get_pos()
pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1),
min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1))
obs, rew, done, info = env.step(np.array(pos, dtype=np.float32))
print('Reward: '+str(rew))
print('Score: '+str(info))
t2 = time()
dt = t2 - t1
delay = (1/env.fps - dt)
if delay < 0:
print("[!] Can't keep framerate!")
else:
sleep(delay)
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

View File

@ -0,0 +1,69 @@
from gym import spaces
import numpy as np
import pygame
class Observable():
def __init__(self):
self.obs = None
pass
def get_observation_space():
print("[!] Using dummyObservable. Env won't output anything")
return spaces.Box(low=0, high=255,
shape=(1,), dtype=np.uint8)
class CnnObservable(Observable):
def __init__(self, in_width=256, in_height=256, out_width=32, out_height=32, draw_width=128, draw_height=128, smooth_scaling=True):
super(CnnObservable, self).__init__()
self.in_width = in_width
self.in_height = in_height
self.out_width = out_width
self.out_height = out_height
self.draw_width = draw_width
self.draw_height = draw_height
if smooth_scaling:
self.scaler = pygame.transform.smoothscale
else:
self.scaler = pygame.transform.scale
def _set_env(self, env):
self.env = env
def get_observation_space(self):
return spaces.Box(low=0, high=255,
shape=(self.out_width, self.out_height), dtype=np.uint8)
def get_observation(self):
x, y = self.env.agent.pos[0]*self.env.width - self.in_width / \
2, self.env.agent.pos[1]*self.env.height - self.in_height/2
w, h = self.in_width, self.in_height
cx, cy = _clip(x, 0, self.env.width), _clip(
y, 0, self.env.height)
cw, ch = _clip(w, 0, self.env.width - cx), _clip(h,
0, self.env.height - cy)
rect = pygame.Rect(cx, cy, cw, ch)
snap = self.env.surface.subsurface(rect)
self.snap = pygame.Surface((self.in_width, self.in_height))
pygame.draw.rect(self.snap, (50, 50, 50),
pygame.Rect(0, 0, self.in_width, self.in_height))
self.snap.blit(snap, (cx - x, cy - y))
self.obs = self.scaler(
self.snap, (self.out_width, self.out_height))
return self.obs
def draw(self):
if not self.obs:
self.get_observation()
big = pygame.transform.scale(
self.obs, (self.draw_width, self.draw_height))
x, y = self.env.width - self.draw_width - 10, 10
pygame.draw.rect(self.env.screen, (50, 50, 50),
pygame.Rect(x - 1, y - 1, self.draw_width + 2, self.draw_height + 2))
self.env.screen.blit(
big, (x, y))
def _clip(num, lower, upper):
return min(max(num, lower), upper)