The None-param filtering probe at init also needs to wait for the game to be reachable, not just the collection loop. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
442 lines
19 KiB
Python
442 lines
19 KiB
Python
import numpy as np
|
|
import time
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import random
|
|
from enum import Enum
|
|
from nucon import Nucon
|
|
import pickle
|
|
import os
|
|
from typing import Union, Tuple, List, Dict
|
|
|
|
Actors = {
|
|
'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()},
|
|
'null': lambda nucon: lambda obs: {},
|
|
}
|
|
|
|
# --- NN-based dynamics model ---
|
|
|
|
class ReactorDynamicsNet(nn.Module):
|
|
def __init__(self, input_dim, output_dim):
|
|
super(ReactorDynamicsNet, self).__init__()
|
|
self.network = nn.Sequential(
|
|
nn.Linear(input_dim + 1, 128), # +1 for time_delta
|
|
nn.ReLU(),
|
|
nn.Linear(128, 128),
|
|
nn.ReLU(),
|
|
nn.Linear(128, output_dim)
|
|
)
|
|
|
|
def forward(self, state, time_delta):
|
|
x = torch.cat([state, time_delta], dim=-1)
|
|
return self.network(x)
|
|
|
|
class ReactorDynamicsModel(nn.Module):
|
|
def __init__(self, input_params: List[str], output_params: List[str]):
|
|
super(ReactorDynamicsModel, self).__init__()
|
|
self.input_params = input_params
|
|
self.output_params = output_params
|
|
self.net = ReactorDynamicsNet(len(input_params), len(output_params))
|
|
|
|
def _state_dict_to_tensor(self, state_dict):
|
|
return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32)
|
|
|
|
def _tensor_to_state_dict(self, tensor):
|
|
return {p: tensor[i].item() for i, p in enumerate(self.output_params)}
|
|
|
|
def forward(self, state_dict, time_delta):
|
|
state_tensor = self._state_dict_to_tensor(state_dict).unsqueeze(0)
|
|
time_delta_tensor = torch.tensor([time_delta], dtype=torch.float32).unsqueeze(0)
|
|
predicted_tensor = self.net(state_tensor, time_delta_tensor)
|
|
return self._tensor_to_state_dict(predicted_tensor.squeeze(0))
|
|
|
|
# --- kNN-based dynamics model ---
|
|
|
|
class ReactorKNNModel:
|
|
"""
|
|
Non-parametric dynamics model using k-nearest neighbours.
|
|
|
|
For a query (state, game_delta):
|
|
1. Find the k dataset entries whose *state* is closest (L2 in normalised space).
|
|
2. For each neighbour compute the per-second rate-of-change:
|
|
rate_i = (next_state_i - state_i) / game_delta_i
|
|
3. Linearly scale to the requested game_delta:
|
|
predicted_delta_i = rate_i * game_delta
|
|
4. Return the inverse-distance-weighted average of those predicted deltas
|
|
added to the current output state.
|
|
|
|
The linear-in-time assumption means two datapoints at 0.5 s and 2 s contribute
|
|
equally once normalised by their own game_delta.
|
|
"""
|
|
|
|
def __init__(self, input_params: List[str], output_params: List[str], k: int = 5):
|
|
self.input_params = input_params
|
|
self.output_params = output_params
|
|
self.k = k
|
|
self._states = None # (n, d_in) normalised state matrix
|
|
self._rates = None # (n, d_out) (next_out - cur_out) / game_delta
|
|
self._raw_states = None # unnormalised, for mean/std computation
|
|
self._mean = None
|
|
self._std = None
|
|
|
|
def fit(self, dataset):
|
|
"""Build lookup tables from a collected dataset."""
|
|
raw, rates = [], []
|
|
for state, _action, next_state, game_delta in dataset:
|
|
if game_delta <= 0:
|
|
continue
|
|
s = np.array([state[p] for p in self.input_params], dtype=np.float32)
|
|
cur = np.array([state[p] for p in self.output_params], dtype=np.float32)
|
|
nxt = np.array([next_state[p] for p in self.output_params], dtype=np.float32)
|
|
raw.append(s)
|
|
rates.append((nxt - cur) / game_delta)
|
|
|
|
self._raw_states = np.array(raw)
|
|
self._rates = np.array(rates)
|
|
self._mean = self._raw_states.mean(axis=0)
|
|
self._std = self._raw_states.std(axis=0) + 1e-8
|
|
self._states = (self._raw_states - self._mean) / self._std
|
|
|
|
def _lookup(self, state_dict: Dict):
|
|
"""Return (s_norm, idx, k) for the k nearest neighbours."""
|
|
s = np.array([state_dict[p] for p in self.input_params], dtype=np.float32)
|
|
s_norm = (s - self._mean) / self._std
|
|
dists = np.linalg.norm(self._states - s_norm, axis=1)
|
|
k = min(self.k, len(dists))
|
|
idx = np.argpartition(dists, k - 1)[:k]
|
|
return s_norm, idx, k
|
|
|
|
def forward(self, state_dict: Dict, time_delta: float) -> Dict:
|
|
if self._states is None:
|
|
raise ValueError("Model not fitted. Call fit(dataset) first.")
|
|
return self.forward_with_uncertainty(state_dict, time_delta)[0]
|
|
|
|
def forward_with_uncertainty(self, state_dict: Dict, time_delta: float):
|
|
"""Return (prediction_dict, uncertainty_scalar).
|
|
|
|
Uncertainty is the GP posterior std in normalised input space:
|
|
0 = query lies exactly on a training point (fully confident)
|
|
~1 = query is far from all neighbours (maximally uncertain)
|
|
"""
|
|
if self._states is None:
|
|
raise ValueError("Model not fitted. Call fit(dataset) first.")
|
|
|
|
s_norm, idx, k = self._lookup(state_dict)
|
|
X = self._states[idx] # (k, d_in)
|
|
Y = self._rates[idx] # (k, d_out)
|
|
|
|
# RBF kernel (vectorised): k(a,b) = exp(-0.5 ||a-b||^2)
|
|
def rbf_matrix(A, B):
|
|
diff = A[:, None, :] - B[None, :, :] # (|A|, |B|, d)
|
|
return np.exp(-0.5 * (diff ** 2).sum(axis=-1)) # (|A|, |B|)
|
|
|
|
K = rbf_matrix(X, X) + 1e-4 * np.eye(k) # (k, k)
|
|
k_star = rbf_matrix(s_norm[None, :], X)[0] # (k,)
|
|
|
|
K_inv = np.linalg.inv(K)
|
|
mean_rates = k_star @ K_inv @ Y # (d_out,)
|
|
|
|
# Posterior variance (scalar, shared across all output dims)
|
|
var = max(0.0, 1.0 - float(k_star @ K_inv @ k_star))
|
|
std = float(np.sqrt(var))
|
|
|
|
cur_out = np.array([state_dict[p] for p in self.output_params], dtype=np.float32)
|
|
predicted = cur_out + mean_rates * time_delta
|
|
|
|
pred_dict = {p: float(predicted[i]) for i, p in enumerate(self.output_params)}
|
|
return pred_dict, std
|
|
|
|
# --- Learner ---
|
|
|
|
class NuconModelLearner:
|
|
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl',
|
|
time_delta: Union[float, Tuple[float, float]] = 1.0,
|
|
include_valve_states: bool = False):
|
|
self.nucon = Nucon() if nucon is None else nucon
|
|
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
|
|
self.dataset = self.load_dataset(dataset_path) or []
|
|
self.dataset_path = dataset_path
|
|
self.include_valve_states = include_valve_states
|
|
self.model = None
|
|
self.optimizer = None
|
|
|
|
# Exclude params with no physics signal
|
|
_JUNK_PARAMS = frozenset({'GAME_VERSION', 'TIME', 'TIME_STAMP', 'TIME_DAY',
|
|
'ALARMS_ACTIVE', 'FUN_IS_ENABLED', 'GAME_SIM_SPEED'})
|
|
candidate_params = {k: p for k, p in self.nucon.get_all_readable().items()
|
|
if k not in _JUNK_PARAMS and p.param_type != str}
|
|
# Filter out params that return None (subsystem not installed).
|
|
# Retry until the game is reachable.
|
|
import requests as _requests
|
|
while True:
|
|
try:
|
|
test_state = {k: self.nucon.get(k) for k in candidate_params}
|
|
break
|
|
except (_requests.exceptions.ConnectionError,
|
|
_requests.exceptions.Timeout):
|
|
print("Waiting for game to be reachable…")
|
|
time.sleep(5)
|
|
self.readable_params = [k for k in candidate_params if test_state[k] is not None]
|
|
self.non_writable_params = [k for k in self.readable_params
|
|
if not self.nucon.get_all_readable()[k].is_writable]
|
|
|
|
# Optionally include valve positions (input only — valves are externally driven)
|
|
self.valve_keys = []
|
|
if include_valve_states:
|
|
valves = self.nucon.get_valves()
|
|
self.valve_keys = [f'VALVE__{name}' for name in sorted(valves.keys())]
|
|
self.readable_params = self.readable_params + self.valve_keys
|
|
# valve positions are input-only (not predicted as outputs)
|
|
|
|
if isinstance(time_delta, (int, float)):
|
|
self.time_delta = lambda: time_delta
|
|
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
|
|
self.time_delta = lambda: random.uniform(*time_delta)
|
|
else:
|
|
raise ValueError("time_delta must be a float or a tuple of two floats")
|
|
|
|
def _get_state(self):
|
|
state = {}
|
|
for param_id in self.readable_params:
|
|
if param_id in self.valve_keys:
|
|
continue # filled below
|
|
value = self.nucon.get(param_id)
|
|
if isinstance(value, Enum):
|
|
value = value.value
|
|
state[param_id] = value
|
|
if self.valve_keys:
|
|
valves = self.nucon.get_valves()
|
|
for key in self.valve_keys:
|
|
name = key[len('VALVE__'):]
|
|
state[key] = valves.get(name, {}).get('Value', 0.0)
|
|
return state
|
|
|
|
def collect_data(self, num_steps, save_every=10):
|
|
"""
|
|
Collect state-transition tuples from the live game.
|
|
|
|
Sleeps wall_time = target_game_delta / sim_speed so that each stored
|
|
game_delta is uniform regardless of the game's simulation speed setting.
|
|
|
|
Saves the dataset every ``save_every`` steps so a crash doesn't lose
|
|
everything. On a connection error the step is skipped and collection
|
|
resumes once the game is reachable again (retries every 5 s).
|
|
"""
|
|
import requests as _requests
|
|
|
|
def get_state_with_retry():
|
|
while True:
|
|
try:
|
|
return self._get_state()
|
|
except (_requests.exceptions.ConnectionError,
|
|
_requests.exceptions.Timeout) as e:
|
|
print(f"Connection lost ({e}). Retrying in 5 s…")
|
|
time.sleep(5)
|
|
|
|
state = get_state_with_retry()
|
|
collected = 0
|
|
for i in range(num_steps):
|
|
action = self.actor(state)
|
|
for param_id, value in action.items():
|
|
try:
|
|
self.nucon.set(param_id, value)
|
|
except Exception:
|
|
pass
|
|
|
|
target_game_delta = self.time_delta()
|
|
try:
|
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
|
except Exception:
|
|
sim_speed = 1.0
|
|
time.sleep(target_game_delta / sim_speed)
|
|
|
|
next_state = get_state_with_retry()
|
|
self.dataset.append((state, action, next_state, target_game_delta))
|
|
state = next_state
|
|
collected += 1
|
|
|
|
if collected % save_every == 0:
|
|
self.save_dataset()
|
|
print(f" {collected}/{num_steps} steps collected, dataset saved.")
|
|
|
|
self.save_dataset()
|
|
print(f"Collection complete. {collected} steps, {len(self.dataset)} total samples.")
|
|
|
|
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
|
|
"""Train a neural-network dynamics model on the current dataset."""
|
|
if self.model is None:
|
|
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
|
self.optimizer = optim.Adam(self.model.parameters())
|
|
elif not isinstance(self.model, ReactorDynamicsModel):
|
|
raise ValueError("A kNN model is already loaded. Create a new learner to train an NN.")
|
|
random.shuffle(self.dataset)
|
|
split_idx = int(len(self.dataset) * (1 - test_split))
|
|
train_data = self.dataset[:split_idx]
|
|
test_data = self.dataset[split_idx:]
|
|
for epoch in range(num_epochs):
|
|
train_loss = self._train_epoch(train_data, batch_size)
|
|
test_loss = self._test_epoch(test_data)
|
|
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
|
|
|
|
def fit_knn(self, k: int = 5):
|
|
"""Fit a kNN/GP dynamics model from the current dataset (instantaneous, no gradient steps)."""
|
|
if self.model is None:
|
|
self.model = ReactorKNNModel(self.readable_params, self.non_writable_params, k=k)
|
|
elif not isinstance(self.model, ReactorKNNModel):
|
|
raise ValueError("An NN model is already loaded. Create a new learner to fit a kNN.")
|
|
self.model.fit(self.dataset)
|
|
print(f"kNN model fitted on {len(self.dataset)} samples.")
|
|
|
|
def predict_with_uncertainty(self, state_dict: Dict, time_delta: float):
|
|
"""Return (prediction_dict, uncertainty_std). Only available after fit_knn()."""
|
|
if not isinstance(self.model, ReactorKNNModel):
|
|
raise ValueError("predict_with_uncertainty() requires a fitted kNN model (call fit_knn()).")
|
|
return self.model.forward_with_uncertainty(state_dict, time_delta)
|
|
|
|
def drop_well_fitted(self, error_threshold: float):
|
|
"""Drop samples the current model already predicts well (MSE < threshold).
|
|
|
|
Keeps only hard/surprising transitions. Useful for NN training to focus
|
|
capacity on difficult regions of state space.
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("No model fitted yet. Call train_model() or fit_knn() first.")
|
|
kept = []
|
|
for state, action, next_state, time_delta in self.dataset:
|
|
pred = self.model.forward(state, time_delta)
|
|
error = sum((pred[p] - next_state[p]) ** 2 for p in self.non_writable_params)
|
|
if error > error_threshold:
|
|
kept.append((state, action, next_state, time_delta))
|
|
dropped = len(self.dataset) - len(kept)
|
|
self.dataset = kept
|
|
self.save_dataset()
|
|
print(f"drop_well_fitted: kept {len(kept)}, dropped {dropped} samples.")
|
|
|
|
def drop_redundant(self, min_state_distance: float, min_output_distance: float = 0.0):
|
|
"""Drop near-duplicate samples, keeping only those that add coverage.
|
|
|
|
A sample is dropped only if *both* its input state and its output
|
|
transition are within the given distances of an already-kept sample
|
|
(L2 in z-scored space). If two samples share the same input state but
|
|
have different transitions they represent genuinely different dynamics
|
|
and are both kept regardless of `min_output_distance`.
|
|
|
|
Args:
|
|
min_state_distance: minimum L2 distance in z-scored input space.
|
|
min_output_distance: minimum L2 distance in z-scored output-delta
|
|
space. Defaults to 0 (only input distance matters).
|
|
"""
|
|
if not self.dataset:
|
|
return
|
|
|
|
in_params = [p for p in self.readable_params if p not in self.valve_keys]
|
|
out_params = self.non_writable_params
|
|
|
|
all_states = np.array([[s[p] for p in in_params] for s, *_ in self.dataset], dtype=np.float32)
|
|
all_deltas = np.array([[ns[p] - s[p] for p in out_params]
|
|
for s, _, ns, gd in self.dataset], dtype=np.float32)
|
|
|
|
s_mean, s_std = all_states.mean(0), all_states.std(0) + 1e-8
|
|
d_mean, d_std = all_deltas.mean(0), all_deltas.std(0) + 1e-8
|
|
|
|
s_norm = (all_states - s_mean) / s_std
|
|
d_norm = (all_deltas - d_mean) / d_std
|
|
|
|
kept_idx = [0]
|
|
kept_s = [s_norm[0]]
|
|
kept_d = [d_norm[0]]
|
|
|
|
for i in range(1, len(self.dataset)):
|
|
s_dists = np.linalg.norm(np.array(kept_s) - s_norm[i], axis=1)
|
|
d_dists = np.linalg.norm(np.array(kept_d) - d_norm[i], axis=1)
|
|
# Drop only if close in BOTH spaces
|
|
if not np.any((s_dists < min_state_distance) & (d_dists < min_output_distance)):
|
|
kept_idx.append(i)
|
|
kept_s.append(s_norm[i])
|
|
kept_d.append(d_norm[i])
|
|
|
|
dropped = len(self.dataset) - len(kept_idx)
|
|
self.dataset = [self.dataset[i] for i in kept_idx]
|
|
self.save_dataset()
|
|
print(f"drop_redundant: kept {len(self.dataset)}, dropped {dropped} samples.")
|
|
|
|
def _train_epoch(self, data, batch_size):
|
|
out_indices = [self.readable_params.index(p) if p in self.readable_params else None
|
|
for p in self.non_writable_params]
|
|
total_loss = 0
|
|
for i in range(0, len(data), batch_size):
|
|
batch = data[i:i+batch_size]
|
|
self.optimizer.zero_grad()
|
|
loss = torch.tensor(0.0)
|
|
for state, _, next_state, time_delta in batch:
|
|
state_t = self.model._state_dict_to_tensor(state).unsqueeze(0)
|
|
td_t = torch.tensor([[time_delta]], dtype=torch.float32)
|
|
pred = self.model.net(state_t, td_t).squeeze(0)
|
|
target = torch.tensor([next_state[p] for p in self.non_writable_params],
|
|
dtype=torch.float32)
|
|
loss = loss + torch.nn.functional.mse_loss(pred, target)
|
|
loss = loss / len(batch)
|
|
loss.backward()
|
|
self.optimizer.step()
|
|
total_loss += loss.item()
|
|
return total_loss / max(1, len(data) // batch_size)
|
|
|
|
def _test_epoch(self, data):
|
|
total_loss = 0.0
|
|
with torch.no_grad():
|
|
for state, _, next_state, time_delta in data:
|
|
state_t = self.model._state_dict_to_tensor(state).unsqueeze(0)
|
|
td_t = torch.tensor([[time_delta]], dtype=torch.float32)
|
|
pred = self.model.net(state_t, td_t).squeeze(0)
|
|
target = torch.tensor([next_state[p] for p in self.non_writable_params],
|
|
dtype=torch.float32)
|
|
total_loss += torch.nn.functional.mse_loss(pred, target).item()
|
|
return total_loss / len(data)
|
|
|
|
def save_model(self, path):
|
|
if self.model is None:
|
|
raise ValueError("No model to save. Call train_model() or fit_knn() first.")
|
|
if isinstance(self.model, ReactorDynamicsModel):
|
|
torch.save({
|
|
'state_dict': self.model.state_dict(),
|
|
'input_params': self.model.input_params,
|
|
'output_params': self.model.output_params,
|
|
}, path)
|
|
else:
|
|
with open(path, 'wb') as f:
|
|
pickle.dump(self.model, f)
|
|
|
|
def load_model(self, path):
|
|
if path.endswith('.pkl'):
|
|
with open(path, 'rb') as f:
|
|
self.model = pickle.load(f)
|
|
else:
|
|
checkpoint = torch.load(path, weights_only=False)
|
|
if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
|
|
m = ReactorDynamicsModel(checkpoint['input_params'], checkpoint['output_params'])
|
|
m.load_state_dict(checkpoint['state_dict'])
|
|
self.model = m
|
|
else:
|
|
# legacy plain state dict
|
|
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
|
self.model.load_state_dict(checkpoint)
|
|
|
|
def save_dataset(self, path=None):
|
|
path = path or self.dataset_path
|
|
with open(path, 'wb') as f:
|
|
pickle.dump(self.dataset, f)
|
|
|
|
def load_dataset(self, path=None):
|
|
path = path or self.dataset_path
|
|
if os.path.exists(path):
|
|
with open(path, 'rb') as f:
|
|
return pickle.load(f)
|
|
return None
|
|
|
|
def merge_datasets(self, other_dataset_path):
|
|
other_dataset = self.load_dataset(other_dataset_path)
|
|
if other_dataset:
|
|
self.dataset.extend(other_dataset)
|
|
self.save_dataset()
|