import torch as th from time import sleep, time import numpy as np import pygame import yaml import random from columbus import env from columbus.observables import Observable, CnnObservable import colorednoise as cn from priorConditionedAnnealing import pca def main(): alg_name, agent_func = choosePlayType() env = chooseEnv(alg_name) while True: playEnv(env, agent_func) try: agent_func.reset() except: pass env.close() def getAvaibleEnvs(): # kinda hacky... idk strs = dir(env) for s in strs: if s.startswith('Columbus') and s != 'ColumbusEnv': yield getattr(env, s) def loadConfigDefinedEnv(EnvClass): p = input('[Path to config> ') with open(p, 'r') as f: docs = list([d for d in yaml.safe_load_all( f) if d and 'name' in d and d['name'] not in ['SLURM']]) for i, doc in enumerate(docs): name = doc['name'] print('['+str(i)+'] '+name) ds = int(input('[0]> ') or '0') doc = docs[ds] cur = doc path = 'params.task.env_args' p = path.split('.') while True: try: if len(p) == 0: break key = p.pop(0) print(key) cur = cur[key] except Exception as e: print('Unable to find key "'+key+'"') path = input('[Path> ') print(cur) return EnvClass(fps=30, **cur) def chooseEnv(alg_name): envs = list(getAvaibleEnvs()) for i, Env in enumerate(envs): print('['+str(i)+'] '+Env.__name__) while True: inp = input('[#> ') try: i = int(inp) except: print('[!] You have to enter the number...') if i < 0 or i >= len(envs): print( '[!] That is a number, but not one that makes sense in this context...') if envs[i] in [env.ColumbusConfigDefined]: return loadConfigDefinedEnv(envs[i]) Env = envs[i] return Env(fps=30, agent_draw_path=True, path_decay=1/1024, title_appendix=' ['+alg_name+']', max_steps=30*10, clear_path_on_reset=False) def value_func(obs): return obs[:, 0] # return th.rand(obs.shape[0])-0.5 def human_input(obs, env): pos = (0.5, 0.5) pos = pygame.mouse.get_pos() pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1), min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1)) pos = pos[0]*2-1, pos[1]*2-1 return pos class Colored_Noise(): def __init__(self, beta=1, dim_a=2, samples=2**18): self.beta = beta self.dim_a = dim_a self.samples = samples self.index = 0 self.reset() def __call__(self, obs, env): sample = self.samples[:, self.index] self.index += 1 return sample def reset(self): self.samples = cn.powerlaw_psd_gaussian( self.beta, (self.dim_a, self.samples), random_state=rand_seed()) class PCA_Noise(): def __init__(self, dim_a=2, kernel_func='SE_1.41_1', window=64, ssf=-1): self.dim_a = dim_a self.kernel_func = kernel_func self.window = window self.ssf = ssf self.index = 0 self.reset() def __call__(self, obs, env): if self.ssf != -1 and self.index % self.ssf == 0: self.traj = [[0]*len(self.traj[0])] traj = th.Tensor(self.traj).unsqueeze(0) sample = self.dist.sample(traj).squeeze(0) self.traj.append(sample) self.index += 1 return sample def reset(self): self.dist = pca.PCA_Distribution( action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window) self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2])) self.traj = [[0]*self.dim_a] class Colored_PCA_Noise(): def __init__(self, beta=1, dim_a=2, samples=2**18, kernel_func='SE_1.41_1', window=64, ssf=-1): self.beta = beta self.dim_a = dim_a self.samples = samples self.kernel_func = kernel_func self.window = window self.ssf = ssf self.index = 0 self.reset() def __call__(self, obs, env): epsilon = self.samples[:, self.index] if self.ssf != -1 and self.index % self.ssf == 0: self.traj = [[0]*len(self.traj[0])] traj = th.Tensor(self.traj).unsqueeze(0) sample = self.dist.sample(traj).squeeze(0) self.traj.append(sample) self.index += 1 return sample def reset(self): self.dist = pca.PCA_Distribution( action_dim=self.dim_a, par_strength='CONT_DIAG', kernel_func=self.kernel_func, window=self.window) self.dist.proba_distribution(th.Tensor([[0]*2]), th.Tensor([[1]*2])) self.traj = [[0]*self.dim_a] self.samples = cn.powerlaw_psd_gaussian( self.beta, (self.dim_a, self.samples), random_state=rand_seed()) def rand_seed(): return int(99999999 * random.random()) def choosePlayType(): options = {'human': human_input, 'PCA': PCA_Noise(), 'REX': Colored_Noise(beta=0), 'PINK': Colored_Noise(beta=1), 'BROWN': Colored_Noise(beta=2), 'BETA.5': Colored_Noise(beta=.5), 'PINK_PCA': Colored_PCA_Noise(beta=1)} for i, name in enumerate(options): print('['+str(i)+'] '+name) while True: inp = input('[#> ') try: i = int(inp) except: print('[!] You have to enter the number...') continue if i < 0 or i >= len(options): print( '[!] That is a number, but not one that makes sense in this context...') else: name = list(options.keys())[i] return name, options[name] def playEnv(env, agent_func): done = False obs = env.reset() while not done: t1 = time() # env.render(value_func=value_func) env.render() inp = agent_func(obs, env) obs, rew, done, info = env.step(np.array(inp, dtype=np.float32)) print('Reward: '+str(rew)) print('Score: '+str(info)) t2 = time() dt = t2 - t1 delay = (1/env.fps - dt) if delay < 0: print("[!] Can't keep framerate!") else: sleep(delay) if __name__ == '__main__': main()