PriorConditionedAnnealing/test.py
2023-05-04 15:39:38 +02:00

295 lines
9.4 KiB
Python

import torch as th
from time import sleep, time
import numpy as np
import pygame
import yaml
import random
from columbus import env
from columbus.observables import Observable, CnnObservable
import colorednoise as cn
from perlin_noise import PerlinNoise
from priorConditionedAnnealing import pca
def main():
alg_name, agent_func = choosePlayType()
env = chooseEnv(alg_name)
while True:
playEnv(env, agent_func)
try:
agent_func.reset()
except:
pass
env.close()
def getAvaibleEnvs():
# kinda hacky... idk
strs = dir(env)
for s in strs:
if s.startswith('Columbus') and s != 'ColumbusEnv':
yield getattr(env, s)
def loadConfigDefinedEnv(EnvClass, alg_name):
p = input('[Path to config> ')
with open(p, 'r') as f:
docs = list([d for d in yaml.safe_load_all(
f) if d and 'name' in d and d['name'] not in ['SLURM']])
for i, doc in enumerate(docs):
name = doc['name']
print('['+str(i)+'] '+name)
ds = int(input('[0]> ') or '0')
doc = docs[ds]
cur = doc
path = 'params.task.env_args'
p = path.split('.')
while True:
try:
if len(p) == 0:
break
key = p.pop(0)
print(key)
cur = cur[key]
except Exception as e:
print('Unable to find key "'+key+'"')
path = input('[Path> ')
print(cur)
return EnvClass(fps=30, title_appendix=' ['+alg_name+']', **cur)
def chooseEnv(alg_name):
envs = list(getAvaibleEnvs())
for i, Env in enumerate(envs):
print('['+str(i)+'] '+Env.__name__)
while True:
inp = input('[#> ')
try:
i = int(inp)
except:
print('[!] You have to enter the number...')
if i < 0 or i >= len(envs):
print(
'[!] That is a number, but not one that makes sense in this context...')
if envs[i] in [env.ColumbusConfigDefined]:
return loadConfigDefinedEnv(envs[i], alg_name)
Env = envs[i]
return Env(fps=30, agent_draw_path=True, path_decay=1/1024, title_appendix=' ['+alg_name+']', max_steps=30*10, clear_path_on_reset=False)
def value_func(obs):
return obs[:, 0]
# return th.rand(obs.shape[0])-0.5
def human_input(obs, env):
pos = (0.5, 0.5)
pos = pygame.mouse.get_pos()
pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1),
min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1))
pos = pos[0]*2-1, pos[1]*2-1
return pos
class Colored_Noise():
def __init__(self, beta=1, dim_a=2, samples=2**18):
self.beta = beta
self.dim_a = dim_a
self.samples = samples
self.index = 0
self.reset()
def __call__(self, obs, env):
sample = self.samples[:, self.index]
self.index += 1
return sample
def reset(self):
self.samples = cn.powerlaw_psd_gaussian(
self.beta, (self.dim_a, self.samples), random_state=rand_seed())
class Perlin_Noise():
def __init__(self, scale=0.05, octaves=1, dim_a=2):
self.scale = scale
self.octaves = octaves
self.dim_a = dim_a
self.magic = 3.14159 # Axis offset
# We want to genrate samples, that approx ~N(0,1)
self.normal_factor = 0.0471
self.reset()
def __call__(self, obs, env):
self.index += 1
return [self.noise([self.index*self.scale, self.magic*a]) / self.normal_factor
for a in range(self.dim_a)]
def reset(self):
self.index = 0
self.noise = PerlinNoise(octaves=self.octaves, seed=rand_seed())
class Perlin_PCA_Noise():
def __init__(self, dim_a=2, kernel_func='SE_1.41_1.0', window=128, ssf=-1, f_sigma=1):
self.dim_a = dim_a
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
self.f_sigma = f_sigma
self.index = 0
self.perlin = Perlin_Noise()
self.reset()
def __call__(self, obs, env):
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
eps = th.Tensor(self.perlin(None, None)).unsqueeze(0)
sample = self.dist.sample(traj, self.f_sigma, epsilon=eps).squeeze(0)
self.traj.append(sample)
self.index += 1
return sample
def reset(self):
self.dist = pca.PCA_Distribution(
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
self.traj = [[0]*self.dim_a]
self.perlin.reset()
class PCA_Noise():
def __init__(self, dim_a=2, kernel_func='SE_1.41_1.0', window=128, ssf=-1, f_sigma=1):
self.dim_a = dim_a
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
self.f_sigma = f_sigma
self.index = 0
self.reset()
def __call__(self, obs, env):
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
sample = self.dist.sample(traj, self.f_sigma).squeeze(0)
self.traj.append(sample)
self.index += 1
return sample
def reset(self):
self.dist = pca.PCA_Distribution(
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
self.traj = [[0]*self.dim_a]
class Human_PCA_Noise():
def __init__(self, dim_a=2, kernel_func='SE_1.414_1.0', window=128, ssf=-1, f_sigma=1):
self.dim_a = dim_a
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
self.f_sigma = f_sigma
self.index = 0
self.reset()
def __call__(self, obs, env):
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
eps = human_input(obs, env)
epsilon = th.Tensor(eps).unsqueeze(0)
sample = self.dist.sample(traj, self.f_sigma, epsilon).squeeze(0)
self.traj.append(sample)
self.index += 1
return sample
def reset(self):
self.dist = pca.PCA_Distribution(
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
self.traj = [[0]*self.dim_a]
class Colored_PCA_Noise():
def __init__(self, beta=1, dim_a=2, samples=2**18, kernel_func='SE_1.41_1', window=64, ssf=-1):
self.beta = beta
self.dim_a = dim_a
self.samples = samples
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
self.index = 0
self.reset()
def __call__(self, obs, env):
epsilon = th.Tensor(self.samples[:, self.index]).unsqueeze(0)
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
sample = self.dist.sample(traj, epsilon=epsilon).squeeze(0)
self.traj.append(sample)
self.index += 1
return sample
def reset(self):
self.dist = pca.PCA_Distribution(
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
self.traj = [[0]*self.dim_a]
self.samples = cn.powerlaw_psd_gaussian(
self.beta, (self.dim_a, self.samples), random_state=rand_seed())
def rand_seed():
return int(99999999 * random.random())
def choosePlayType():
options = {'human': human_input, 'PCA': PCA_Noise(),
'REX': Colored_Noise(beta=0), 'PINK': Colored_Noise(beta=1), 'BROWN': Colored_Noise(beta=2), 'BETA.5': Colored_Noise(beta=.5), 'PINK_PCA': Colored_PCA_Noise(beta=1), 'Precise_PCA': PCA_Noise(f_sigma=0.33), 'Perlin': Perlin_Noise(scale=0.05, octaves=1), 'FastPerlin': Perlin_Noise(scale=0.2, octaves=1), 'SlowPerlin': Perlin_Noise(scale=0.0125, octaves=1), 'Perlin_3': Perlin_Noise(scale=0.05, octaves=3), 'Perlin_8': Perlin_Noise(scale=0.05, octaves=8), 'Perlin_PCA': Perlin_PCA_Noise(), 'Human_PCA': Human_PCA_Noise()}
for i, name in enumerate(options):
print('['+str(i)+'] '+name)
while True:
inp = input('[#> ')
try:
i = int(inp)
except:
print('[!] You have to enter the number...')
continue
if i < 0 or i >= len(options):
print(
'[!] That is a number, but not one that makes sense in this context...')
else:
name = list(options.keys())[i]
return name, options[name]
def playEnv(env, agent_func):
done = False
obs = env.reset()
while not done:
t1 = time()
# env.render(value_func=value_func)
env.render()
inp = agent_func(obs, env)
obs, rew, done, info = env.step(np.array(inp, dtype=np.float32))
print('Reward: '+str(rew))
print('Score: '+str(info))
t2 = time()
dt = t2 - t1
delay = (1/env.fps - dt)
if delay < 0:
print("[!] Can't keep framerate!")
else:
sleep(delay)
if __name__ == '__main__':
main()