Columbus/env.py

193 lines
6.7 KiB
Python

import gym
from gym import spaces
import numpy as np
import pygame
import random as random_dont_use
import math
import entities
import observables
class ColumbusEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, observable=observables.Observable(), fps=60, env_seed=3.1):
super(Base2DExpEnv, self).__init__()
self.action_space = spaces.Box(
low=0, high=1, shape=(2,), dtype=np.float32)
observable._set_env(self)
self.observable = observable
self.observation_space = self.observable.get_observation_space()
self.title = 'Untitled'
self.fps = fps
self.env_seed = env_seed
self.joystick_offset = (10, 10)
self.surface = None
self.screen = None
self.width = 720
self.height = 720
self.speed_fac = 0.01/fps*60
self.acc_fac = 0.03/fps*60
self.agent_drag = 0 # 0.01 is a good value
self.controll_type = 'SPEED' # one of SPEED, ACC
self.limit_inp_to_unit_circle = True
self.aux_reward_max = 0 # 0 = off
self.aux_reward_discretize = 0 # 0 = dont discretize
self.draw_observable = True
self.draw_joystick = True
self.rng = random_dont_use.Random()
self.reset()
def _seed(self, seed):
self.rng.seed(seed)
def random(self):
return self.rng.random()
def _ensure_surface(self):
if not self.surface:
self.surface = pygame.Surface((self.width, self.height))
self.screen = pygame.display.set_mode((self.width, self.height))
pygame.display.set_caption(self.title)
def _limit_to_unit_circle(self, coords):
l_sq = coords[0]**2 + coords[1]**2
if l_sq > 1:
l = math.sqrt(l_sq)
coords = coords[0] / l, coords[1] / l
return coords
def _step_entities(self):
for entity in self.entities:
entity.step()
def _step_timers(self):
new_timers = []
for time_left, func, arg in self.timers:
time_left -= 1/self.fps
if time_left < 0:
func(arg)
else:
new_timers.append((time_left, func, arg))
self.timers = new_timers
def sq_dist(self, entity1, entity2):
return (entity1.pos[0] - entity2.pos[0])**2 + (entity1.pos[1] - entity2.pos[1])**2
def dist(self, entity1, entity2):
return math.sqrt(self._sq_dist(entity1, entity2))
def _get_aux_reward(self):
aux_reward = 0
for entity in self.entities:
if isinstance(entity, entities.Reward):
if entity.avaible:
reward = self.aux_reward_max / \
(1 + self.sq_dist(entity, self.agent))
if self.aux_reward_discretize:
reward = int(reward*self.aux_reward_discretize*2) / \
self.aux_reward_discretize / 2
aux_reward += reward
return aux_reward
def step(self, action):
inp = action[0], action[1]
if self.limit_inp_to_unit_circle:
inp = self._limit_to_unit_circle(((inp[0]-0.5)*2, (inp[1]-0.5)*2))
inp = (inp[0]+1)/2, (inp[1]+1)/2
self.inp = inp
self._step_timers()
self._step_entities()
observation = self.observable.get_observation()
reward, self.new_reward, self.new_abs_reward = self.new_reward / \
self.fps + self.new_abs_reward, 0, 0
self.score += reward # aux_reward does not count towards the score
if self.aux_reward_max:
reward += self._get_aux_reward()
return observation, reward, 0, self.score
return observation, reward, done, info
def check_collisions_for(self, entity):
for other in self.entities:
if other != entity:
sq_dist = ((other.pos[0]-entity.pos[0])*self.width) ** 2 \
+ ((other.pos[1]-entity.pos[1])*self.height)**2
if sq_dist < (entity.radius + other.radius)**2:
entity.on_collision(other)
other.on_collision(entity)
def kill_entity(self, target):
newEntities = []
for entity in self.entities:
if target != entity:
newEntities.append(entity)
else:
del target
break
self.entities = newEntities
def setup(self):
for i in range(16):
enemy = entities.CircleBarrier(self)
enemy.radius = self.random()*40+50
self.entities.append(enemy)
for i in range(3):
enemy = entities.FlyingChaser(self)
enemy.chase_acc = self.random()*0.4*0.3 # *0.6+0.5
self.entities.append(enemy)
for i in range(0):
reward = entities.TimeoutReward(self)
self.entities.append(reward)
for i in range(1):
reward = entities.TeleportingReward(self)
self.entities.append(reward)
def reset(self):
pygame.init()
self.inp = (0.5, 0.5)
# will get rescaled acording to fps (=reward per second)
self.new_reward = 0
self.new_abs_reward = 0 # will not get rescaled. should be used for one-time rewards
self.score = 0
self.entities = []
self.timers = []
self.agent = entities.Agent(self)
self.setup()
self.entities.append(self.agent) # add it last, will be drawn on top
self._seed(self.env_seed)
return 0
return observation # reward, done, info can't be included
def _draw_entities(self):
for entity in self.entities:
entity.draw()
def _draw_observable(self, forceDraw=False):
if self.draw_observable or forceDraw:
self.observable.draw()
def _draw_joystick(self, forceDraw=False):
if self.draw_joystick:
x, y = self.inp
pygame.draw.circle(self.screen, (100, 100, 100), (50 +
self.joystick_offset[0], 50+self.joystick_offset[1]), 50, width=1)
pygame.draw.circle(self.screen, (100, 100, 100), (20+int(60*x) +
self.joystick_offset[0], 20+int(60*y)+self.joystick_offset[1]), 20, width=0)
def render(self, mode='human'):
self._ensure_surface()
pygame.draw.rect(self.surface, (0, 0, 0),
pygame.Rect(0, 0, self.width, self.height))
self._draw_entities()
self.screen.blit(self.surface, (0, 0))
self._draw_observable()
self._draw_joystick()
pygame.display.update()
def close(self):
pygame.display.quit()
pygame.quit()