PriorConditionedAnnealing/test.py

266 lines
8.3 KiB
Python
Raw Normal View History

import torch as th
from time import sleep, time
import numpy as np
import pygame
import yaml
2023-05-03 23:17:45 +02:00
import random
from columbus import env
from columbus.observables import Observable, CnnObservable
import colorednoise as cn
2023-05-04 12:18:33 +02:00
from perlin_noise import PerlinNoise
from priorConditionedAnnealing import pca
def main():
2023-05-03 23:17:45 +02:00
alg_name, agent_func = choosePlayType()
env = chooseEnv(alg_name)
while True:
playEnv(env, agent_func)
2023-05-03 23:17:45 +02:00
try:
agent_func.reset()
except:
pass
env.close()
def getAvaibleEnvs():
# kinda hacky... idk
strs = dir(env)
for s in strs:
if s.startswith('Columbus') and s != 'ColumbusEnv':
yield getattr(env, s)
2023-05-04 12:18:33 +02:00
def loadConfigDefinedEnv(EnvClass, alg_name):
p = input('[Path to config> ')
with open(p, 'r') as f:
docs = list([d for d in yaml.safe_load_all(
f) if d and 'name' in d and d['name'] not in ['SLURM']])
for i, doc in enumerate(docs):
name = doc['name']
print('['+str(i)+'] '+name)
ds = int(input('[0]> ') or '0')
doc = docs[ds]
cur = doc
path = 'params.task.env_args'
p = path.split('.')
while True:
try:
if len(p) == 0:
break
key = p.pop(0)
print(key)
cur = cur[key]
except Exception as e:
print('Unable to find key "'+key+'"')
path = input('[Path> ')
print(cur)
2023-05-04 12:18:33 +02:00
return EnvClass(fps=30, title_appendix=' ['+alg_name+']', **cur)
2023-05-03 23:17:45 +02:00
def chooseEnv(alg_name):
envs = list(getAvaibleEnvs())
for i, Env in enumerate(envs):
print('['+str(i)+'] '+Env.__name__)
while True:
inp = input('[#> ')
try:
i = int(inp)
except:
print('[!] You have to enter the number...')
if i < 0 or i >= len(envs):
print(
'[!] That is a number, but not one that makes sense in this context...')
if envs[i] in [env.ColumbusConfigDefined]:
2023-05-04 12:18:33 +02:00
return loadConfigDefinedEnv(envs[i], alg_name)
Env = envs[i]
2023-05-03 23:17:45 +02:00
return Env(fps=30, agent_draw_path=True, path_decay=1/1024, title_appendix=' ['+alg_name+']', max_steps=30*10, clear_path_on_reset=False)
def value_func(obs):
return obs[:, 0]
# return th.rand(obs.shape[0])-0.5
def human_input(obs, env):
pos = (0.5, 0.5)
pos = pygame.mouse.get_pos()
pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1),
min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1))
pos = pos[0]*2-1, pos[1]*2-1
return pos
class Colored_Noise():
def __init__(self, beta=1, dim_a=2, samples=2**18):
2023-05-03 23:17:45 +02:00
self.beta = beta
self.dim_a = dim_a
self.samples = samples
self.index = 0
2023-05-03 23:17:45 +02:00
self.reset()
def __call__(self, obs, env):
sample = self.samples[:, self.index]
self.index += 1
return sample
2023-05-03 23:17:45 +02:00
def reset(self):
self.samples = cn.powerlaw_psd_gaussian(
self.beta, (self.dim_a, self.samples), random_state=rand_seed())
2023-05-04 12:18:33 +02:00
class Perlin_Noise():
def __init__(self, scale=0.05, octaves=1, dim_a=2):
self.scale = scale
self.octaves = octaves
self.dim_a = dim_a
self.magic = 3.14159 # Axis offset
# We want to genrate samples, that approx ~N(0,1)
self.normal_factor = 0.0471
self.reset()
def __call__(self, obs, env):
self.index += 1
return [self.noise([self.index*self.scale, self.magic*a]) / self.normal_factor
for a in range(self.dim_a)]
def reset(self):
self.index = 0
self.noise = PerlinNoise(octaves=self.octaves, seed=rand_seed())
class Perlin_PCA_Noise():
def __init__(self, dim_a=2, kernel_func='SE_1.41_1', window=64, ssf=-1, f_sigma=1):
self.dim_a = dim_a
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
self.f_sigma = f_sigma
self.index = 0
self.perlin = Perlin_Noise()
self.reset()
def __call__(self, obs, env):
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
eps = th.Tensor(self.perlin(None, None)).unsqueeze(0)
sample = self.dist.sample(traj, self.f_sigma, epsilon=eps).squeeze(0)
self.traj.append(sample)
self.index += 1
return sample
def reset(self):
self.dist = pca.PCA_Distribution(
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
self.traj = [[0]*self.dim_a]
self.perlin.reset()
class PCA_Noise():
2023-05-04 12:18:33 +02:00
def __init__(self, dim_a=2, kernel_func='SE_1.41_1', window=64, ssf=-1, f_sigma=1):
2023-05-03 23:17:45 +02:00
self.dim_a = dim_a
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
2023-05-04 12:18:33 +02:00
self.f_sigma = f_sigma
2023-05-03 23:17:45 +02:00
self.index = 0
self.reset()
def __call__(self, obs, env):
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
2023-05-04 12:18:33 +02:00
sample = self.dist.sample(traj, self.f_sigma).squeeze(0)
2023-05-03 23:17:45 +02:00
self.traj.append(sample)
self.index += 1
return sample
def reset(self):
self.dist = pca.PCA_Distribution(
2023-05-03 23:17:45 +02:00
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
2023-05-03 23:17:45 +02:00
self.traj = [[0]*self.dim_a]
class Colored_PCA_Noise():
def __init__(self, beta=1, dim_a=2, samples=2**18, kernel_func='SE_1.41_1', window=64, ssf=-1):
self.beta = beta
self.dim_a = dim_a
self.samples = samples
self.kernel_func = kernel_func
self.window = window
self.ssf = ssf
self.index = 0
2023-05-03 23:17:45 +02:00
self.reset()
def __call__(self, obs, env):
2023-05-03 23:17:45 +02:00
epsilon = self.samples[:, self.index]
if self.ssf != -1 and self.index % self.ssf == 0:
self.traj = [[0]*len(self.traj[0])]
traj = th.Tensor(self.traj).unsqueeze(0)
sample = self.dist.sample(traj).squeeze(0)
self.traj.append(sample)
self.index += 1
return sample
2023-05-03 23:17:45 +02:00
def reset(self):
self.dist = pca.PCA_Distribution(
action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window)
self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2]))
self.traj = [[0]*self.dim_a]
self.samples = cn.powerlaw_psd_gaussian(
self.beta, (self.dim_a, self.samples), random_state=rand_seed())
def rand_seed():
return int(99999999 * random.random())
def choosePlayType():
options = {'human': human_input, 'PCA': PCA_Noise(),
2023-05-04 12:18:33 +02:00
'REX': Colored_Noise(beta=0), 'PINK': Colored_Noise(beta=1), 'BROWN': Colored_Noise(beta=2), 'BETA.5': Colored_Noise(beta=.5), 'PINK_PCA': Colored_PCA_Noise(beta=1), 'Precise_PCA': PCA_Noise(f_sigma=0.33), 'Perlin': Perlin_Noise(scale=0.05, octaves=1), 'FastPerlin': Perlin_Noise(scale=0.2, octaves=1), 'SlowPerlin': Perlin_Noise(scale=0.0125, octaves=1), 'Perlin_3': Perlin_Noise(scale=0.05, octaves=3), 'Perlin_8': Perlin_Noise(scale=0.05, octaves=8), 'Perlin_PCA': Perlin_PCA_Noise()}
for i, name in enumerate(options):
print('['+str(i)+'] '+name)
while True:
inp = input('[#> ')
try:
i = int(inp)
except:
print('[!] You have to enter the number...')
continue
if i < 0 or i >= len(options):
print(
'[!] That is a number, but not one that makes sense in this context...')
else:
2023-05-03 23:17:45 +02:00
name = list(options.keys())[i]
return name, options[name]
def playEnv(env, agent_func):
done = False
obs = env.reset()
while not done:
t1 = time()
# env.render(value_func=value_func)
env.render()
inp = agent_func(obs, env)
obs, rew, done, info = env.step(np.array(inp, dtype=np.float32))
print('Reward: '+str(rew))
print('Score: '+str(info))
t2 = time()
dt = t2 - t1
delay = (1/env.fps - dt)
if delay < 0:
print("[!] Can't keep framerate!")
else:
sleep(delay)
if __name__ == '__main__':
main()