import torch as th from time import sleep, time import numpy as np import pygame import yaml import random from columbus import env from columbus.observables import Observable, CnnObservable import colorednoise as cn from perlin_noise import PerlinNoise from priorConditionedAnnealing import pca def main(): alg_name, agent_func = choosePlayType() env = chooseEnv(alg_name) while True: playEnv(env, agent_func) try: agent_func.reset() except: pass env.close() def getAvaibleEnvs(): # kinda hacky... idk strs = dir(env) for s in strs: if s.startswith('Columbus') and s != 'ColumbusEnv': yield getattr(env, s) def loadConfigDefinedEnv(EnvClass, alg_name): p = input('[Path to config> ') with open(p, 'r') as f: docs = list([d for d in yaml.safe_load_all( f) if d and 'name' in d and d['name'] not in ['SLURM']]) for i, doc in enumerate(docs): name = doc['name'] print('['+str(i)+'] '+name) ds = int(input('[0]> ') or '0') doc = docs[ds] cur = doc path = 'params.task.env_args' p = path.split('.') while True: try: if len(p) == 0: break key = p.pop(0) print(key) cur = cur[key] except Exception as e: print('Unable to find key "'+key+'"') path = input('[Path> ') print(cur) return EnvClass(fps=30, title_appendix=' ['+alg_name+']', **cur) def chooseEnv(alg_name): envs = list(getAvaibleEnvs()) for i, Env in enumerate(envs): print('['+str(i)+'] '+Env.__name__) while True: inp = input('[#> ') try: i = int(inp) except: print('[!] You have to enter the number...') if i < 0 or i >= len(envs): print( '[!] That is a number, but not one that makes sense in this context...') if envs[i] in [env.ColumbusConfigDefined]: return loadConfigDefinedEnv(envs[i], alg_name) Env = envs[i] return Env(fps=30, agent_draw_path=True, path_decay=1/1024, title_appendix=' ['+alg_name+']', max_steps=30*10, clear_path_on_reset=False) def value_func(obs): return obs[:, 0] # return th.rand(obs.shape[0])-0.5 def human_input(obs, env): pos = (0.5, 0.5) pos = pygame.mouse.get_pos() pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1), min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1)) pos = pos[0]*2-1, pos[1]*2-1 return pos class Colored_Noise(): def __init__(self, beta=1, dim_a=2, samples=2**18): self.beta = beta self.dim_a = dim_a self.samples = samples self.index = 0 self.reset() def __call__(self, obs, env): sample = self.samples[:, self.index] self.index += 1 return sample def reset(self): self.samples = cn.powerlaw_psd_gaussian( self.beta, (self.dim_a, self.samples), random_state=rand_seed()) class Perlin_Noise(): def __init__(self, scale=0.05, octaves=1, dim_a=2): self.scale = scale self.octaves = octaves self.dim_a = dim_a self.magic = 3.14159 # Axis offset # We want to genrate samples, that approx ~N(0,1) self.normal_factor = 0.0471 self.reset() def __call__(self, obs, env): self.index += 1 return [self.noise([self.index*self.scale, self.magic*a]) / self.normal_factor for a in range(self.dim_a)] def reset(self): self.index = 0 self.noise = PerlinNoise(octaves=self.octaves, seed=rand_seed()) class Perlin_PCA_Noise(): def __init__(self, dim_a=2, kernel_func='SE_1.41_1.0', window=128, ssf=-1, f_sigma=1): self.dim_a = dim_a self.kernel_func = kernel_func self.window = window self.ssf = ssf self.f_sigma = f_sigma self.index = 0 self.perlin = Perlin_Noise() self.reset() def __call__(self, obs, env): if self.ssf != -1 and self.index % self.ssf == 0: self.traj = [[0]*len(self.traj[0])] traj = th.Tensor(self.traj).unsqueeze(0) eps = th.Tensor(self.perlin(None, None)).unsqueeze(0) sample = self.dist.sample(traj, self.f_sigma, epsilon=eps).squeeze(0) self.traj.append(sample) self.index += 1 return sample def reset(self): self.dist = pca.PCA_Distribution( action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window) self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2])) self.traj = [[0]*self.dim_a] self.perlin.reset() class PCA_Noise(): def __init__(self, dim_a=2, kernel_func='SE_1.41_1.0', window=128, ssf=-1, f_sigma=1): self.dim_a = dim_a self.kernel_func = kernel_func self.window = window self.ssf = ssf self.f_sigma = f_sigma self.index = 0 self.reset() def __call__(self, obs, env): if self.ssf != -1 and self.index % self.ssf == 0: self.traj = [[0]*len(self.traj[0])] traj = th.Tensor(self.traj).unsqueeze(0) sample = self.dist.sample(traj, self.f_sigma).squeeze(0) self.traj.append(sample) self.index += 1 return sample def reset(self): self.dist = pca.PCA_Distribution( action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window) self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2])) self.traj = [[0]*self.dim_a] class Human_PCA_Noise(): def __init__(self, dim_a=2, kernel_func='SE_1.414_1.0', window=128, ssf=-1, f_sigma=1): self.dim_a = dim_a self.kernel_func = kernel_func self.window = window self.ssf = ssf self.f_sigma = f_sigma self.index = 0 self.reset() def __call__(self, obs, env): if self.ssf != -1 and self.index % self.ssf == 0: self.traj = [[0]*len(self.traj[0])] traj = th.Tensor(self.traj).unsqueeze(0) eps = human_input(obs, env) epsilon = th.Tensor(eps).unsqueeze(0) sample = self.dist.sample(traj, self.f_sigma, epsilon).squeeze(0) self.traj.append(sample) self.index += 1 return sample def reset(self): self.dist = pca.PCA_Distribution( action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window) self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2])) self.traj = [[0]*self.dim_a] class Colored_PCA_Noise(): def __init__(self, beta=1, dim_a=2, samples=2**18, kernel_func='SE_1.41_1', window=64, ssf=-1): self.beta = beta self.dim_a = dim_a self.samples = samples self.kernel_func = kernel_func self.window = window self.ssf = ssf self.index = 0 self.reset() def __call__(self, obs, env): epsilon = th.Tensor(self.samples[:, self.index]).unsqueeze(0) if self.ssf != -1 and self.index % self.ssf == 0: self.traj = [[0]*len(self.traj[0])] traj = th.Tensor(self.traj).unsqueeze(0) sample = self.dist.sample(traj, epsilon=epsilon).squeeze(0) self.traj.append(sample) self.index += 1 return sample def reset(self): self.dist = pca.PCA_Distribution( action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window) self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2])) self.traj = [[0]*self.dim_a] self.samples = cn.powerlaw_psd_gaussian( self.beta, (self.dim_a, self.samples), random_state=rand_seed()) def rand_seed(): return int(99999999 * random.random()) def choosePlayType(): options = {'human': human_input, 'PCA': PCA_Noise(), 'REX': Colored_Noise(beta=0), 'PINK': Colored_Noise(beta=1), 'BROWN': Colored_Noise(beta=2), 'BETA.5': Colored_Noise(beta=.5), 'PINK_PCA': Colored_PCA_Noise(beta=1), 'Precise_PCA': PCA_Noise(f_sigma=0.33), 'Perlin': Perlin_Noise(scale=0.05, octaves=1), 'FastPerlin': Perlin_Noise(scale=0.2, octaves=1), 'SlowPerlin': Perlin_Noise(scale=0.0125, octaves=1), 'Perlin_3': Perlin_Noise(scale=0.05, octaves=3), 'Perlin_8': Perlin_Noise(scale=0.05, octaves=8), 'Perlin_PCA': Perlin_PCA_Noise(), 'Human_PCA': Human_PCA_Noise()} for i, name in enumerate(options): print('['+str(i)+'] '+name) while True: inp = input('[#> ') try: i = int(inp) except: print('[!] You have to enter the number...') continue if i < 0 or i >= len(options): print( '[!] That is a number, but not one that makes sense in this context...') else: name = list(options.keys())[i] return name, options[name] def playEnv(env, agent_func): done = False obs = env.reset() while not done: t1 = time() # env.render(value_func=value_func) env.render() inp = agent_func(obs, env) obs, rew, done, info = env.step(np.array(inp, dtype=np.float32)) print('Reward: '+str(rew)) print('Score: '+str(info)) t2 = time() dt = t2 - t1 delay = (1/env.fps - dt) if delay < 0: print("[!] Can't keep framerate!") else: sleep(delay) if __name__ == '__main__': main()