From a3356c46544ed1872960eae6281b068327a32d95 Mon Sep 17 00:00:00 2001 From: Dominik Roth Date: Wed, 3 May 2023 15:10:25 +0200 Subject: [PATCH] Pad and Cut Trajectory to correct length --- priorConditionedAnnealing/pca.py | 13 ++- test.py | 160 +++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 test.py diff --git a/priorConditionedAnnealing/pca.py b/priorConditionedAnnealing/pca.py index 329361a..e46c228 100644 --- a/priorConditionedAnnealing/pca.py +++ b/priorConditionedAnnealing/pca.py @@ -6,6 +6,7 @@ from torch import nn from stable_baselines3.common.distributions import Distribution as SB3_Distribution from stable_baselines3.common.distributions import sum_independent_dims from torch.distributions import Normal +import torch.nn.functional as F class Par_Strength(Enum): @@ -112,7 +113,7 @@ class PCA_Distribution(SB3_Distribution): return sum_independent_dims(self.distribution.entropy()) def sample(self, traj: th.Tensor) -> th.Tensor: - pi_mean, pi_std = self.distribution.mean, self.distribution.scale, + pi_mean, pi_std = self.distribution.mean, self.distribution.scale rho_mean, rho_std = self._conditioning_engine(traj, pi_mean, pi_std) eta = self._get_rigged(pi_mean, pi_std, rho_mean, rho_std) @@ -136,7 +137,15 @@ class PCA_Distribution(SB3_Distribution): return eta.detach() - def _conditioning_engine(self, traj, pi_mean, pi_std): + def _pad_and_cut_trajectory(self, traj, value=0): + cut = traj[:self.window] + if traj.shape[-2] < self.window: + missing = self.window - traj.shape[-2] + return F.pad(input=cut, pad=(missing, 0), value=value) + return cut + + def _conditioning_engine(self, trajectory, pi_mean, pi_std): + traj = self._pad_and_cut_trajectory(trajectory) y_np = np.append(np.swapaxes(traj, -1, -2), np.expand_dims(pi_mean, -1), -1) diff --git a/test.py b/test.py new file mode 100644 index 0000000..5a06baa --- /dev/null +++ b/test.py @@ -0,0 +1,160 @@ +import torch as th +from time import sleep, time +import numpy as np +import pygame +import yaml + +from columbus import env +from columbus.observables import Observable, CnnObservable + +import colorednoise as cn + +from pca import * + + +def main(): + agent_func = choosePlayType() + env = chooseEnv() + while True: + playEnv(env, agent_func) + input('') + env.close() + + +def getAvaibleEnvs(): + # kinda hacky... idk + strs = dir(env) + for s in strs: + if s.startswith('Columbus') and s != 'ColumbusEnv': + yield getattr(env, s) + + +def loadConfigDefinedEnv(EnvClass): + p = input('[Path to config> ') + with open(p, 'r') as f: + docs = list([d for d in yaml.safe_load_all( + f) if d and 'name' in d and d['name'] not in ['SLURM']]) + for i, doc in enumerate(docs): + name = doc['name'] + print('['+str(i)+'] '+name) + ds = int(input('[0]> ') or '0') + doc = docs[ds] + cur = doc + path = 'params.task.env_args' + p = path.split('.') + while True: + try: + if len(p) == 0: + break + key = p.pop(0) + print(key) + cur = cur[key] + except Exception as e: + print('Unable to find key "'+key+'"') + path = input('[Path> ') + print(cur) + return EnvClass(fps=30, **cur) + + +def chooseEnv(): + envs = list(getAvaibleEnvs()) + for i, Env in enumerate(envs): + print('['+str(i)+'] '+Env.__name__) + while True: + inp = input('[#> ') + try: + i = int(inp) + except: + print('[!] You have to enter the number...') + if i < 0 or i >= len(envs): + print( + '[!] That is a number, but not one that makes sense in this context...') + if envs[i] in [env.ColumbusConfigDefined]: + return loadConfigDefinedEnv(envs[i]) + Env = envs[i] + return Env(fps=30) + + +def value_func(obs): + return obs[:, 0] + # return th.rand(obs.shape[0])-0.5 + + +def human_input(obs): + pos = (0.5, 0.5) + pos = pygame.mouse.get_pos() + pos = (min(max((pos[0]-env.joystick_offset[0]-20)/60, 0), 1), + min(max((pos[1]-env.joystick_offset[1]-20)/60, 0), 1)) + pos = pos[0]*2-1, pos[1]*2-1 + return pos + + +def colored_noise(beta=1, dim_a=2, samples=2**18): + index = [0]*dim_a + samples = [] + for d in range(dim_a): + samples.append(cn.powerlaw_psd_gaussian(beta, samples)) + + def noise_generator(obs): + sample = [] + for d in range(dim_a): + sample.append(samples[d][index]) + index += 1 + return sample + + return noise_generator + + +def pca_noise(lengthscale=1, dim_a=2, kernel_func='RBF', window=16): + + dist = PCA_Distribution( + action_dim=dim_a, par_strength='SCALAR', kernel_func=kernel_func, window=window) + + traj = [] + + def noise_generator(obs): + sample = dist.sample(th.Tensor(traj)) + traj.append(sample) + return sample + + return noise_generator + + +def choosePlayType(): + options = {'human': human_input, 'REX': None, 'PCA': None, 'PINK': None} + for i, name in enumerate(options): + print('['+str(i)+'] '+name) + while True: + inp = input('[#> ') + try: + i = int(inp) + except: + print('[!] You have to enter the number...') + if i < 0 or i >= len(options): + print( + '[!] That is a number, but not one that makes sense in this context...') + return options[name] + + +def playEnv(env, agent_func): + done = False + obs = env.reset() + while not done: + t1 = time() + # env.render(value_func=value_func) + env.render() + inp = agent_func(obs) + obs, rew, done, info = env.step(np.array(inp, dtype=np.float32)) + print('Reward: '+str(rew)) + print('Score: '+str(info)) + t2 = time() + dt = t2 - t1 + delay = (1/env.fps - dt) + if delay < 0: + print("[!] Can't keep framerate!") + else: + sleep(delay) + + +if __name__ == '__main__': + main()