PriorConditionedAnnealing/priorConditionedAnnealing/noise.py

249 lines
9.3 KiB
Python
Raw Normal View History

import numpy as np
import torch as th
import colorednoise as cn
from perlin_noise import PerlinNoise
from torch.distributions import Normal
2023-09-10 14:46:38 +02:00
PI = 3.1415926535897932384626433
class Colored_Noise():
def __init__(self, known_shape=None, beta=1, num_samples=2**14, random_state=None):
assert known_shape, 'known_shape need to be defined for Colored Noise'
self.known_shape = known_shape
self.compact_shape = np.prod(list(known_shape))
self.beta = beta
self.num_samples = num_samples # Actually very cheap...
self.index = 0
self.reset(random_state=random_state)
def __call__(self, shape, latent: th.Tensor = None) -> th.Tensor:
assert shape == self.known_shape or (shape[1:] == self.known_shape[1:] and shape[0] <= self.known_shape[0])
sample = self.samples[:, self.index]
self.index = (self.index+1) % self.num_samples
return th.Tensor(sample).view(self.known_shape)[:shape[0]]
def reset(self, random_state=None):
self.samples = cn.powerlaw_psd_gaussian(
self.beta, (self.compact_shape, self.num_samples), random_state=random_state)
class Pink_Noise(Colored_Noise):
def __init__(self, known_shape=None, num_samples=2**14, random_state=None):
2023-08-29 12:37:00 +02:00
super().__init__(known_shape=known_shape, beta=1, num_samples=num_samples, random_state=random_state)
class shortPink_Noise(Colored_Noise):
def __init__(self, known_shape=None, num_samples=1000, random_state=None):
super().__init__(known_shape=known_shape, beta=1, num_samples=num_samples, random_state=random_state)
class White_Noise():
def __init__(self, known_shape=None):
self.known_shape = known_shape
def __call__(self, shape=None, latent: th.Tensor = None) -> th.Tensor:
if shape == None:
shape = self.known_shape
return th.Tensor(np.random.normal(0, 1, shape))
2024-03-30 14:36:46 +01:00
def reset(self):
pass
def get_colored_noise(beta, known_shape=None):
if beta == 0:
return White_Noise(known_shape)
elif beta == 1:
return Pink_Noise(known_shape)
else:
return Colored_Noise(known_shape, beta=beta)
class SDE_Noise():
def __init__(self, shape, latent_sde_dim=64, Base_Noise=White_Noise):
raise Exception('Not implemented yet. Just use SB3s gSDE...')
self.shape = shape
self.latent_sde_dim = latent_sde_dim
self.Base_Noise = Base_Noise
batch_size = self.shape[0]
self.weights_dist = self.Base_Noise(
(self.latent_sde_dim,) + self.shape)
self.weights_dist_batch = self.Base_Noise(
(batch_size, self.latent_sde_dim,) + self.shape)
def sample_weights(self):
# Reparametrization trick to pass gradients
self.exploration_mat = self.weights_dist.sample()
# Pre-compute matrices in case of parallel exploration
self.exploration_matrices = self.weights_dist_batch.sample()
def __call__(self, latent: th.Tensor) -> th.Tensor:
latent_sde = latent.detach()
latent_sde = latent_sde[..., -self.sde_latent_dim:]
latent_sde = th.nn.functional.normalize(latent_sde, dim=-1)
p = self.distribution
if isinstance(p, th.distributions.Normal) or isinstance(p, th.distributions.Independent):
chol = th.diag_embed(self.distribution.stddev)
elif isinstance(p, th.distributions.MultivariateNormal):
chol = p.scale_tril
# Default case: only one exploration matrix
if len(latent_sde) == 1 or len(latent_sde) != len(self.exploration_matrices):
return (th.mm(latent_sde, self.exploration_mat) @ chol)[0]
# Use batch matrix multiplication for efficient computation
# (batch_size, n_features) -> (batch_size, 1, n_features)
latent_sde = latent_sde.unsqueeze(dim=1)
# (batch_size, 1, n_actions)
noise = th.bmm(th.bmm(latent_sde, self.exploration_matrices), chol)
return noise.squeeze(dim=1)
class Perlin_Noise():
2023-06-26 16:18:35 +02:00
def __init__(self, known_shape=None, scale=0.1, octave=1):
self.known_shape = known_shape
self.scale = scale
2023-06-26 16:18:35 +02:00
self.octave = octave
2023-09-10 14:46:38 +02:00
self.magic = PI # Axis offset, should be (kinda) irrational
# We want to genrate samples, that approx ~N(0,1)
2023-09-10 14:46:38 +02:00
self.normal_factor = PI/20
2023-09-07 21:07:25 +02:00
self.clear_cache_every = 128
self.reset()
def __call__(self, shape=None):
if shape == None:
shape = self.known_shape
self.index += 1
2023-09-10 14:46:38 +02:00
noise = [self.noise([self.index*self.scale, self.magic+(2*a)]) / self.normal_factor
2023-06-26 16:18:35 +02:00
for a in range(shape[-1])]
if self.index % self.clear_cache_every == 0:
self.noise.cache = {}
return th.Tensor(noise)
def reset(self):
self.index = 0
2023-06-26 16:18:35 +02:00
self.noise = PerlinNoise(octaves=self.octave)
2024-06-28 11:26:44 +02:00
class Async_Perlin_Noise():
2024-04-18 14:29:07 +02:00
def __init__(self, known_shape=None, scale=0.1, octave=1):
self.known_shape = known_shape
self.scale = scale
self.octave = octave
self.magic = PI # Axis offset, should be (kinda) irrational
# We want to genrate samples, that approx ~N(0,1)
self.normal_factor = PI/20
self.clear_cache_every = 128
self.reset()
def __call__(self, shape=None):
if shape == None:
shape = self.known_shape
self.index += 1
noise = [self.noise([self.index*self.scale, self.magic+(2*a)]) / self.normal_factor
for a in range(np.prod(shape))]
if self.index % self.clear_cache_every == 0:
self.noise.cache = {}
return th.Tensor(noise).view(shape)
def reset(self):
self.index = 0
self.noise = PerlinNoise(octaves=self.octave)
2023-06-26 16:18:35 +02:00
class Harmonic_Perlin_Noise():
def __init__(self, known_shape=None, scale=0.1, octaves=8):
self.known_shape = known_shape
self.scale = scale
assert octaves >= 1
2023-06-26 16:26:14 +02:00
if type(octaves) in [int, float]:
int_octaves = int(octaves)
2023-06-26 16:31:27 +02:00
octaves_arr = [1/(i+1) for i in range(int_octaves)]
if int_octaves != octaves:
2023-06-26 16:31:27 +02:00
octaves_arr += [1/(int_octaves+2)*(octaves-int_octaves)]
octaves_arr = np.array(octaves_arr)
self.octaves = octaves_arr / np.linalg.norm(octaves_arr)
self.clear_cache_every = 1024
2023-06-26 16:18:35 +02:00
self.reset()
def __call__(self, shape=None):
if shape == None:
shape = self.known_shape
2023-06-26 16:18:35 +02:00
harmonics = [noise(shape)*self.octaves[i] for i, noise in enumerate(self.noises)]
if self.index % self.clear_cache_every == 0:
for i, noise in enumerate(self.noises):
noise.cache = {}
2023-06-26 16:18:35 +02:00
return sum(harmonics)
def reset(self):
self.index = 0
self.noises = []
for octave, amplitude in enumerate(self.octaves):
self.noises += [Perlin_Noise(known_shape=self.known_shape, scale=self.scale, octave=(octave+1))]
2023-07-12 22:32:35 +02:00
class Dirty_Perlin_Noise():
def __init__(self, known_shape=None, scale=0.1, dirty_ratio=1/3):
self.known_shape = known_shape
self.scale = scale
self.dirty_ratio = dirty_ratio
self.reset()
def __call__(self, shape=None):
if shape == None:
shape = self.known_shape
return self.perlin(shape)*(1-self.dirty_ratio) + self.white(shape)*self.dirty_ratio
def reset(self):
self.perlin = Perlin_Noise(known_shape=self.known_shape, scale=self.scale, octave=1)
self.white = White_Noise(known_shape=self.known_shape)
2024-04-18 14:29:07 +02:00
class Rayleigh_Perlin_Noise():
def __init__(self, known_shape=None, sigma=0.1):
self.known_shape = known_shape
self.sigma = sigma
self.magic = PI # Axis offset, should be (kinda) irrational
# We want to genrate samples, that approx ~N(0,1)
self.normal_factor = PI/20
self.clear_cache_every = 128
self.reset()
def __call__(self, shape=None):
assert shape == self.known_shape or (shape[1:] == self.known_shape[1:] and shape[0] <= self.known_shape[0])
self.index += 1
noise = [self.noise([self.index*self.scales[a%np.prod(self.known_shape[:-1])], self.magic+(2*a)]) / self.normal_factor
for a in range(np.prod(shape))]
if self.index % self.clear_cache_every == 0:
self.noise.cache = {}
return th.Tensor(noise).view(shape)
def reset(self):
self.index = 0
self.scales = np.random.rayleigh(scale=self.sigma, size=np.prod(self.known_shape[:-1]))
2024-06-28 11:26:44 +02:00
self.noise = PerlinNoise(octaves=1)
class Sync_Rayleigh_Perlin_Noise():
def __init__(self, known_shape=None, sigma=0.1):
self.known_shape = known_shape
self.sigma = sigma
self.magic = PI # Axis offset, should be (kinda) irrational
# We want to genrate samples, that approx ~N(0,1)
self.normal_factor = PI/20
self.clear_cache_every = 128
self.reset()
def __call__(self, shape=None):
if shape == None:
shape = self.known_shape
self.index += 1
noise = [self.noise([self.index*self.scale, self.magic+(2*a)]) / self.normal_factor
for a in range(shape[-1])]
if self.index % self.clear_cache_every == 0:
self.noise.cache = {}
return th.Tensor(noise)
def reset(self):
self.index = 0
self.scale = np.random.rayleigh(scale=self.sigma, size=(1,))[0]
self.noise = PerlinNoise(octaves=1)