From 65cd0516cd4c012b7dfb3c5544b82717a0af8690 Mon Sep 17 00:00:00 2001 From: Dominik Roth Date: Sun, 19 Jun 2022 15:01:30 +0200 Subject: [PATCH] initial commit --- .gitignore | 3 + README.md | 3 + entities.py | 195 +++++++++++++++++++++++++++++++++++++++++++++++++ env.py | 192 ++++++++++++++++++++++++++++++++++++++++++++++++ humanPlayer.py | 43 +++++++++++ observables.py | 69 +++++++++++++++++ 6 files changed, 505 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 entities.py create mode 100644 env.py create mode 100644 humanPlayer.py create mode 100644 observables.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3d46b6b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pychache__ +*.pyc +*.pyo diff --git a/README.md b/README.md new file mode 100644 index 0000000..a977964 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Project Columbus + +Project Columbus is a framework for trivial 2D OpenAI Gym environments that are supposed to test a agents ability to solve tasks that require different forms of exploration effectively and efficiently. diff --git a/entities.py b/entities.py new file mode 100644 index 0000000..c03c7ca --- /dev/null +++ b/entities.py @@ -0,0 +1,195 @@ +import pygame +import math + + +class Entity(object): + def __init__(self, env): + self.env = env + self.pos = (env.random(), env.random()) + self.speed = (0, 0) + self.acc = (0, 0) + self.drag = 0 + self.radius = 10 + self.col = (255, 255, 255) + self.shape = 'circle' + + def physics_step(self): + x, y = self.pos + vx, vy = self.speed + ax, ay = self.acc + vx, vy = vx+ax*self.env.acc_fac, vy+ay*self.env.acc_fac + x, y = x+vx*self.env.speed_fac, y+vy*self.env.speed_fac + if x > 1 or x < 0: + x = min(max(x, 0), 1) + vx = 0 + if y > 1 or y < 0: + y = min(max(y, 0), 1) + vy = 0 + self.speed = vx/(1+self.drag), vy/(1+self.drag) + self.pos = x, y + + def controll_step(self): + pass + + def step(self): + self.controll_step() + self.physics_step() + + def draw(self): + x, y = self.pos + pygame.draw.circle(self.env.surface, self.col, + (x*self.env.width, y*self.env.height), self.radius, width=0) + + def on_collision(self, other): + pass + + def kill(self): + self.env.kill_entity(self) + + +class Agent(Entity): + def __init__(self, env): + super(Agent, self).__init__(env) + self.pos = (0.5, 0.5) + self.col = (0, 0, 255) + self.drag = self.env.agent_drag + self.controll_type = self.env.controll_type + + def controll_step(self): + self._read_input() + self.env.check_collisions_for(self) + + def _read_input(self): + if self.controll_type == 'SPEED': + self.speed = self.env.inp[0] - 0.5, self.env.inp[1] - 0.5 + elif self.controll_type == 'ACC': + self.acc = self.env.inp[0] - 0.5, self.env.inp[1] - 0.5 + else: + raise Exception('Unsupported controll_type') + + +class Enemy(Entity): + def __init__(self, env): + super(Enemy, self).__init__(env) + self.col = (255, 0, 0) + self.damage = 10 + + def on_collision(self, other): + if isinstance(other, Agent): + self.env.new_reward -= self.damage + + +class Barrier(Enemy): + def __init__(self, env): + super(Barrier, self).__init__(env) + + +class CircleBarrier(Barrier): + def __init__(self, env): + super(CircleBarrier, self).__init__(env) + + +class Chaser(Enemy): + def __init__(self, env): + super(Chaser, self).__init__(env) + self.target = self.env.agent + self.arrow_fak = 100 + self.lookahead = 0 + + def _get_arrow(self): + tx, ty = self.target.pos + x, y = self.pos + fx, fy = x + self.speed[0]*self.lookahead*self.env.speed_fac, y + \ + self.speed[1]*self.lookahead*self.env.speed_fac + dx, dy = (tx-fx)*self.arrow_fak, (ty-fy)*self.arrow_fak + return self.env._limit_to_unit_circle((dx, dy)) + + +class WalkingChaser(Chaser): + def __init__(self, env): + super(WalkingChaser, self).__init__(env) + self.col = (255, 0, 0) + self.chase_speed = 0.45 + + def controll_step(self): + arrow = self._get_arrow() + self.speed = arrow[0] * self.chase_speed, arrow[1] * self.chase_speed + + +class FlyingChaser(Chaser): + def __init__(self, env): + super(FlyingChaser, self).__init__(env) + self.col = (255, 0, 0) + self.chase_acc = 0.5 + self.arrow_fak = 5 + self.lookahead = 8 + env.random()*2 + + def controll_step(self): + arrow = self._get_arrow() + self.acc = arrow[0] * self.chase_acc, arrow[1] * self.chase_acc + + +class Reward(Entity): + def __init__(self, env): + super(Reward, self).__init__(env) + self.col = (0, 255, 0) + self.avaible = True + self.enforce_not_on_barrier = False + self.reward = 1 + + def on_collision(self, other): + if isinstance(other, Agent): + self.on_collect() + elif isinstance(other, Barrier): + self.on_barrier_collision() + + def on_collect(self): + self.env.new_reward += self.reward + + def on_barrier_collision(self): + if self.enforce_not_on_barrier: + self.pos = (self.env.random(), self.env.random()) + self.env.check_collisions_for(self) + + +class OnceReward(Reward): + def __init__(self, env): + super(OnceReward, self).__init__(env) + self.reward = 100 + + def on_collect(self): + self.env.new_abs_reward += self.reward + self.kill() + + +class TeleportingReward(OnceReward): + def __init__(self, env): + super(TeleportingReward, self).__init__(env) + self.enforce_not_on_barrier = True + self.env.check_collisions_for(self) + + def on_collect(self): + self.env.new_abs_reward += self.reward + self.pos = (self.env.random(), self.env.random()) + self.env.check_collisions_for(self) + + +class TimeoutReward(OnceReward): + def __init__(self, env): + super(TimeoutReward, self).__init__(env) + self.enforce_not_on_barrier = True + self.env.check_collisions_for(self) + self.timeout = 10 + + def set_avaible(self, value): + self.avaible = value + if self.avaible: + self.col = (0, 255, 0) + else: + self.col = (50, 100, 50) + + def on_collect(self): + if self.avaible: + self.env.new_abs_reward += self.reward + self.set_avaible(False) + self.env.timers.append((self.timeout, self.set_avaible, True)) diff --git a/env.py b/env.py new file mode 100644 index 0000000..1e3d9c0 --- /dev/null +++ b/env.py @@ -0,0 +1,192 @@ +import gym +from gym import spaces +import numpy as np +import pygame +import random as random_dont_use +import math +import entities +import observables + + +class Base2DExpEnv(gym.Env): + metadata = {'render.modes': ['human']} + + def __init__(self, observable=observables.Observable(), fps=60, env_seed=3.1): + super(Base2DExpEnv, self).__init__() + self.action_space = spaces.Box( + low=0, high=1, shape=(2,), dtype=np.float32) + observable._set_env(self) + self.observable = observable + self.observation_space = self.observable.get_observation_space() + self.title = 'Untitled' + self.fps = fps + self.env_seed = env_seed + self.joystick_offset = (10, 10) + self.surface = None + self.screen = None + self.width = 720 + self.height = 720 + self.speed_fac = 0.01/fps*60 + self.acc_fac = 0.03/fps*60 + self.agent_drag = 0 # 0.01 is a good value + self.controll_type = 'SPEED' # one of SPEED, ACC + self.limit_inp_to_unit_circle = True + self.aux_reward_max = 0 # 0 = off + self.aux_reward_discretize = 0 # 0 = dont discretize + self.draw_observable = True + self.draw_joystick = True + + self.rng = random_dont_use.Random() + self.reset() + + def _seed(self, seed): + self.rng.seed(seed) + + def random(self): + return self.rng.random() + + def _ensure_surface(self): + if not self.surface: + self.surface = pygame.Surface((self.width, self.height)) + self.screen = pygame.display.set_mode((self.width, self.height)) + pygame.display.set_caption(self.title) + + def _limit_to_unit_circle(self, coords): + l_sq = coords[0]**2 + coords[1]**2 + if l_sq > 1: + l = math.sqrt(l_sq) + coords = coords[0] / l, coords[1] / l + return coords + + def _step_entities(self): + for entity in self.entities: + entity.step() + + def _step_timers(self): + new_timers = [] + for time_left, func, arg in self.timers: + time_left -= 1/self.fps + if time_left < 0: + func(arg) + else: + new_timers.append((time_left, func, arg)) + self.timers = new_timers + + def sq_dist(self, entity1, entity2): + return (entity1.pos[0] - entity2.pos[0])**2 + (entity1.pos[1] - entity2.pos[1])**2 + + def dist(self, entity1, entity2): + return math.sqrt(self._sq_dist(entity1, entity2)) + + def _get_aux_reward(self): + aux_reward = 0 + for entity in self.entities: + if isinstance(entity, entities.Reward): + if entity.avaible: + reward = self.aux_reward_max / \ + (1 + self.sq_dist(entity, self.agent)) + + if self.aux_reward_discretize: + reward = int(reward*self.aux_reward_discretize*2) / \ + self.aux_reward_discretize / 2 + + aux_reward += reward + return aux_reward + + def step(self, action): + inp = action[0], action[1] + if self.limit_inp_to_unit_circle: + inp = self._limit_to_unit_circle(((inp[0]-0.5)*2, (inp[1]-0.5)*2)) + inp = (inp[0]+1)/2, (inp[1]+1)/2 + self.inp = inp + self._step_timers() + self._step_entities() + observation = self.observable.get_observation() + reward, self.new_reward, self.new_abs_reward = self.new_reward / \ + self.fps + self.new_abs_reward, 0, 0 + self.score += reward # aux_reward does not count towards the score + if self.aux_reward_max: + reward += self._get_aux_reward() + return observation, reward, 0, self.score + return observation, reward, done, info + + def check_collisions_for(self, entity): + for other in self.entities: + if other != entity: + sq_dist = ((other.pos[0]-entity.pos[0])*self.width) ** 2 \ + + ((other.pos[1]-entity.pos[1])*self.height)**2 + if sq_dist < (entity.radius + other.radius)**2: + entity.on_collision(other) + other.on_collision(entity) + + def kill_entity(self, target): + newEntities = [] + for entity in self.entities: + if target != entity: + newEntities.append(entity) + else: + del target + break + self.entities = newEntities + + def setup(self): + for i in range(16): + enemy = entities.CircleBarrier(self) + enemy.radius = self.random()*40+50 + self.entities.append(enemy) + for i in range(3): + enemy = entities.FlyingChaser(self) + enemy.chase_acc = self.random()*0.4*0.3 # *0.6+0.5 + self.entities.append(enemy) + for i in range(0): + reward = entities.TimeoutReward(self) + self.entities.append(reward) + for i in range(1): + reward = entities.TeleportingReward(self) + self.entities.append(reward) + + def reset(self): + pygame.init() + self.inp = (0.5, 0.5) + # will get rescaled acording to fps (=reward per second) + self.new_reward = 0 + self.new_abs_reward = 0 # will not get rescaled. should be used for one-time rewards + self.score = 0 + self.entities = [] + self.timers = [] + self.agent = entities.Agent(self) + self.setup() + self.entities.append(self.agent) # add it last, will be drawn on top + self._seed(self.env_seed) + return 0 + return observation # reward, done, info can't be included + + def _draw_entities(self): + for entity in self.entities: + entity.draw() + + def _draw_observable(self, forceDraw=False): + if self.draw_observable or forceDraw: + self.observable.draw() + + def _draw_joystick(self, forceDraw=False): + if self.draw_joystick: + x, y = self.inp + pygame.draw.circle(self.screen, (100, 100, 100), (50 + + self.joystick_offset[0], 50+self.joystick_offset[1]), 50, width=1) + pygame.draw.circle(self.screen, (100, 100, 100), (20+int(60*x) + + self.joystick_offset[0], 20+int(60*y)+self.joystick_offset[1]), 20, width=0) + + def render(self, mode='human'): + self._ensure_surface() + pygame.draw.rect(self.surface, (0, 0, 0), + pygame.Rect(0, 0, self.width, self.height)) + self._draw_entities() + self.screen.blit(self.surface, (0, 0)) + self._draw_observable() + self._draw_joystick() + pygame.display.update() + + def close(self): + pygame.display.quit() + pygame.quit() diff --git a/humanPlayer.py b/humanPlayer.py new file mode 100644 index 0000000..ed9b12f --- /dev/null +++ b/humanPlayer.py @@ -0,0 +1,43 @@ +from time import sleep, time +from env import Base2DExpEnv +import numpy as np +import pygame + +from observables import Observable, CnnObservable + + +def main(): + env = Base2DExpEnv(fps=60, observable=CnnObservable()) + playEnv(env) + env.close() + + +def playEnv(env): + env.reset() + done = False + while not done: + t1 = time() + env.render() + pos = (0.5, 0.5) + for event in pygame.event.get(): + pass + # if event.type == pygame.MOUSEBUTTONDOWN: + # pos = pygame.mouse.get_pos() + # print(pos) + pos = pygame.mouse.get_pos() + pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1), + min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1)) + obs, rew, done, info = env.step(np.array(pos, dtype=np.float32)) + print('Reward: '+str(rew)) + print('Score: '+str(info)) + t2 = time() + dt = t2 - t1 + delay = (1/env.fps - dt) + if delay < 0: + print("[!] Can't keep framerate!") + else: + sleep(delay) + + +if __name__ == '__main__': + main() diff --git a/observables.py b/observables.py new file mode 100644 index 0000000..134dd37 --- /dev/null +++ b/observables.py @@ -0,0 +1,69 @@ +from gym import spaces +import numpy as np +import pygame + + +class Observable(): + def __init__(self): + self.obs = None + pass + + def get_observation_space(): + print("[!] Using dummyObservable. Env won't output anything") + return spaces.Box(low=0, high=255, + shape=(1,), dtype=np.uint8) + + +class CnnObservable(Observable): + def __init__(self, in_width=256, in_height=256, out_width=32, out_height=32, draw_width=128, draw_height=128, smooth_scaling=True): + super(CnnObservable, self).__init__() + self.in_width = in_width + self.in_height = in_height + self.out_width = out_width + self.out_height = out_height + self.draw_width = draw_width + self.draw_height = draw_height + if smooth_scaling: + self.scaler = pygame.transform.smoothscale + else: + self.scaler = pygame.transform.scale + + def _set_env(self, env): + self.env = env + + def get_observation_space(self): + return spaces.Box(low=0, high=255, + shape=(self.out_width, self.out_height), dtype=np.uint8) + + def get_observation(self): + x, y = self.env.agent.pos[0]*self.env.width - self.in_width / \ + 2, self.env.agent.pos[1]*self.env.height - self.in_height/2 + w, h = self.in_width, self.in_height + cx, cy = _clip(x, 0, self.env.width), _clip( + y, 0, self.env.height) + cw, ch = _clip(w, 0, self.env.width - cx), _clip(h, + 0, self.env.height - cy) + rect = pygame.Rect(cx, cy, cw, ch) + snap = self.env.surface.subsurface(rect) + self.snap = pygame.Surface((self.in_width, self.in_height)) + pygame.draw.rect(self.snap, (50, 50, 50), + pygame.Rect(0, 0, self.in_width, self.in_height)) + self.snap.blit(snap, (cx - x, cy - y)) + self.obs = self.scaler( + self.snap, (self.out_width, self.out_height)) + return self.obs + + def draw(self): + if not self.obs: + self.get_observation() + big = pygame.transform.scale( + self.obs, (self.draw_width, self.draw_height)) + x, y = self.env.width - self.draw_width - 10, 10 + pygame.draw.rect(self.env.screen, (50, 50, 50), + pygame.Rect(x - 1, y - 1, self.draw_width + 2, self.draw_height + 2)) + self.env.screen.blit( + big, (x, y)) + + +def _clip(num, lower, upper): + return min(max(num, lower), upper)