NuCon/nucon/model.py
Dominik Roth 0932bb353a feat: SAC+HER training on kNN-GP sim with direct bypass and scripts/
- nucon/rl.py: delta_action_scale action space, bool handling (>=0.5),
  direct sim read/write bypassing HTTP for ~2000fps env throughput;
  remove uncertainty_abort from training (use penalty-only), larger
  default batch sizes; fix _read_obs and step for in-process sim
- nucon/model.py: optimise _lookup with einsum squared-L2, vectorised
  rbf kernel; forward_with_uncertainty uses pre-built normalised arrays
- nucon/sim.py: _update_reactor_state writes outputs via setattr directly
- scripts/train_sac.py: moved from root; full SAC+HER example with kNN-GP
  sim, delta actions, uncertainty penalty, init_states
- scripts/collect_dataset.py: CLI tool to collect dynamics dataset from
  live game session (--steps, --delta, --out, --merge)
- README.md: add Scripts section, reference both scripts in training loop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 20:43:37 +01:00

442 lines
19 KiB
Python

import numpy as np
import time
import torch
import torch.nn as nn
import torch.optim as optim
import random
from enum import Enum
from nucon import Nucon
import pickle
import os
from typing import Union, Tuple, List, Dict
Actors = {
'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()},
'null': lambda nucon: lambda obs: {},
}
# --- NN-based dynamics model ---
class ReactorDynamicsNet(nn.Module):
def __init__(self, input_dim, output_dim):
super(ReactorDynamicsNet, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim + 1, 128), # +1 for time_delta
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, state, time_delta):
x = torch.cat([state, time_delta], dim=-1)
return self.network(x)
class ReactorDynamicsModel(nn.Module):
def __init__(self, input_params: List[str], output_params: List[str]):
super(ReactorDynamicsModel, self).__init__()
self.input_params = input_params
self.output_params = output_params
self.net = ReactorDynamicsNet(len(input_params), len(output_params))
def _state_dict_to_tensor(self, state_dict):
return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32)
def _tensor_to_state_dict(self, tensor):
return {p: tensor[i].item() for i, p in enumerate(self.output_params)}
def forward(self, state_dict, time_delta):
state_tensor = self._state_dict_to_tensor(state_dict).unsqueeze(0)
time_delta_tensor = torch.tensor([time_delta], dtype=torch.float32).unsqueeze(0)
predicted_tensor = self.net(state_tensor, time_delta_tensor)
return self._tensor_to_state_dict(predicted_tensor.squeeze(0))
# --- kNN-based dynamics model ---
class ReactorKNNModel:
"""
Non-parametric dynamics model using k-nearest neighbours.
For a query (state, game_delta):
1. Find the k dataset entries whose *state* is closest (L2 in normalised space).
2. For each neighbour compute the per-second rate-of-change:
rate_i = (next_state_i - state_i) / game_delta_i
3. Linearly scale to the requested game_delta:
predicted_delta_i = rate_i * game_delta
4. Return the inverse-distance-weighted average of those predicted deltas
added to the current output state.
The linear-in-time assumption means two datapoints at 0.5 s and 2 s contribute
equally once normalised by their own game_delta.
"""
def __init__(self, input_params: List[str], output_params: List[str], k: int = 5):
self.input_params = input_params
self.output_params = output_params
self.k = k
self._states = None # (n, d_in) normalised state matrix
self._rates = None # (n, d_out) (next_out - cur_out) / game_delta
self._raw_states = None # unnormalised, for mean/std computation
self._mean = None
self._std = None
def fit(self, dataset):
"""Build lookup tables from a collected dataset."""
raw, rates = [], []
for state, _action, next_state, game_delta in dataset:
if game_delta <= 0:
continue
s = np.array([state[p] for p in self.input_params], dtype=np.float32)
cur = np.array([state[p] for p in self.output_params], dtype=np.float32)
nxt = np.array([next_state[p] for p in self.output_params], dtype=np.float32)
raw.append(s)
rates.append((nxt - cur) / game_delta)
self._raw_states = np.array(raw)
self._rates = np.array(rates)
self._mean = self._raw_states.mean(axis=0)
self._std = self._raw_states.std(axis=0) + 1e-8
self._states = (self._raw_states - self._mean) / self._std
def _lookup(self, s: np.ndarray):
"""Return (s_norm, idx, k) for the k nearest neighbours. s is a raw (d_in,) array."""
s_norm = (s - self._mean) / self._std
diff = self._states - s_norm # (n, d_in) broadcast
dists = np.einsum('ij,ij->i', diff, diff) # squared L2, faster than linalg.norm
k = min(self.k, len(dists))
idx = np.argpartition(dists, k - 1)[:k]
return s_norm, idx, k
def forward(self, state_dict: Dict, time_delta: float) -> Dict:
if self._states is None:
raise ValueError("Model not fitted. Call fit(dataset) first.")
return self.forward_with_uncertainty(state_dict, time_delta)[0]
def forward_with_uncertainty(self, state_dict: Dict, time_delta: float):
"""Return (prediction_dict, uncertainty_scalar).
Uncertainty is the GP posterior std in normalised input space:
0 = query lies exactly on a training point (fully confident)
~1 = query is far from all neighbours (maximally uncertain)
"""
if self._states is None:
raise ValueError("Model not fitted. Call fit(dataset) first.")
s = np.array([state_dict[p] for p in self.input_params], dtype=np.float32)
s_norm, idx, k = self._lookup(s)
X = self._states[idx] # (k, d_in)
Y = self._rates[idx] # (k, d_out)
# RBF kernel: k(a,b) = exp(-0.5 ||a-b||^2)
def rbf(A, B):
diff = A[:, None, :] - B[None, :, :]
return np.exp(-0.5 * np.einsum('ijk,ijk->ij', diff, diff))
K = rbf(X, X) + 1e-4 * np.eye(k)
k_star = rbf(s_norm[None, :], X)[0]
K_inv = np.linalg.inv(K)
mean_rates = k_star @ K_inv @ Y
var = max(0.0, 1.0 - float(k_star @ K_inv @ k_star))
std = float(np.sqrt(var))
cur_out = np.array([state_dict[p] for p in self.output_params], dtype=np.float32)
predicted = cur_out + mean_rates * time_delta
pred_dict = {p: float(predicted[i]) for i, p in enumerate(self.output_params)}
return pred_dict, std
# --- Learner ---
class NuconModelLearner:
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl',
time_delta: Union[float, Tuple[float, float]] = 1.0,
include_valve_states: bool = False):
self.nucon = Nucon() if nucon is None else nucon
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
self.dataset = self.load_dataset(dataset_path) or []
self.dataset_path = dataset_path
self.include_valve_states = include_valve_states
self.model = None
self.optimizer = None
# Exclude params with no physics signal
_JUNK_PARAMS = frozenset({'GAME_VERSION', 'TIME', 'TIME_STAMP', 'TIME_DAY',
'ALARMS_ACTIVE', 'FUN_IS_ENABLED', 'GAME_SIM_SPEED'})
candidate_params = {k: p for k, p in self.nucon.get_all_readable().items()
if k not in _JUNK_PARAMS and p.param_type != str}
# Filter out params that return None (subsystem not installed).
# Retry until the game is reachable.
import requests as _requests
while True:
try:
test_state = {k: self.nucon.get(k) for k in candidate_params}
break
except (_requests.exceptions.ConnectionError,
_requests.exceptions.Timeout):
print("Waiting for game to be reachable…")
time.sleep(5)
self.readable_params = [k for k in candidate_params if test_state[k] is not None]
self.non_writable_params = [k for k in self.readable_params
if not self.nucon.get_all_readable()[k].is_writable]
# Optionally include valve positions (input only — valves are externally driven)
self.valve_keys = []
if include_valve_states:
valves = self.nucon.get_valves()
self.valve_keys = [f'VALVE__{name}' for name in sorted(valves.keys())]
self.readable_params = self.readable_params + self.valve_keys
# valve positions are input-only (not predicted as outputs)
if isinstance(time_delta, (int, float)):
self.time_delta = lambda: time_delta
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
self.time_delta = lambda: random.uniform(*time_delta)
else:
raise ValueError("time_delta must be a float or a tuple of two floats")
def _get_state(self):
state = {}
for param_id in self.readable_params:
if param_id in self.valve_keys:
continue # filled below
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
state[param_id] = value
if self.valve_keys:
valves = self.nucon.get_valves()
for key in self.valve_keys:
name = key[len('VALVE__'):]
state[key] = valves.get(name, {}).get('Value', 0.0)
return state
def collect_data(self, num_steps, save_every=10):
"""
Collect state-transition tuples from the live game.
Sleeps wall_time = target_game_delta / sim_speed so that each stored
game_delta is uniform regardless of the game's simulation speed setting.
Saves the dataset every ``save_every`` steps so a crash doesn't lose
everything. On a connection error the step is skipped and collection
resumes once the game is reachable again (retries every 5 s).
"""
import requests as _requests
def get_state_with_retry():
while True:
try:
return self._get_state()
except (_requests.exceptions.ConnectionError,
_requests.exceptions.Timeout) as e:
print(f"Connection lost ({e}). Retrying in 5 s…")
time.sleep(5)
state = get_state_with_retry()
collected = 0
for i in range(num_steps):
action = self.actor(state)
for param_id, value in action.items():
try:
self.nucon.set(param_id, value)
except Exception:
pass
target_game_delta = self.time_delta()
try:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
except Exception:
sim_speed = 1.0
time.sleep(target_game_delta / sim_speed)
next_state = get_state_with_retry()
self.dataset.append((state, action, next_state, target_game_delta))
state = next_state
collected += 1
if collected % save_every == 0:
self.save_dataset()
print(f" {collected}/{num_steps} steps collected, dataset saved.")
self.save_dataset()
print(f"Collection complete. {collected} steps, {len(self.dataset)} total samples.")
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
"""Train a neural-network dynamics model on the current dataset."""
if self.model is None:
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.optimizer = optim.Adam(self.model.parameters())
elif not isinstance(self.model, ReactorDynamicsModel):
raise ValueError("A kNN model is already loaded. Create a new learner to train an NN.")
random.shuffle(self.dataset)
split_idx = int(len(self.dataset) * (1 - test_split))
train_data = self.dataset[:split_idx]
test_data = self.dataset[split_idx:]
for epoch in range(num_epochs):
train_loss = self._train_epoch(train_data, batch_size)
test_loss = self._test_epoch(test_data)
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
def fit_knn(self, k: int = 5):
"""Fit a kNN/GP dynamics model from the current dataset (instantaneous, no gradient steps)."""
if self.model is None:
self.model = ReactorKNNModel(self.readable_params, self.non_writable_params, k=k)
elif not isinstance(self.model, ReactorKNNModel):
raise ValueError("An NN model is already loaded. Create a new learner to fit a kNN.")
self.model.fit(self.dataset)
print(f"kNN model fitted on {len(self.dataset)} samples.")
def predict_with_uncertainty(self, state_dict: Dict, time_delta: float):
"""Return (prediction_dict, uncertainty_std). Only available after fit_knn()."""
if not isinstance(self.model, ReactorKNNModel):
raise ValueError("predict_with_uncertainty() requires a fitted kNN model (call fit_knn()).")
return self.model.forward_with_uncertainty(state_dict, time_delta)
def drop_well_fitted(self, error_threshold: float):
"""Drop samples the current model already predicts well (MSE < threshold).
Keeps only hard/surprising transitions. Useful for NN training to focus
capacity on difficult regions of state space.
"""
if self.model is None:
raise ValueError("No model fitted yet. Call train_model() or fit_knn() first.")
kept = []
for state, action, next_state, time_delta in self.dataset:
pred = self.model.forward(state, time_delta)
error = sum((pred[p] - next_state[p]) ** 2 for p in self.non_writable_params)
if error > error_threshold:
kept.append((state, action, next_state, time_delta))
dropped = len(self.dataset) - len(kept)
self.dataset = kept
self.save_dataset()
print(f"drop_well_fitted: kept {len(kept)}, dropped {dropped} samples.")
def drop_redundant(self, min_state_distance: float, min_output_distance: float = 0.0):
"""Drop near-duplicate samples, keeping only those that add coverage.
A sample is dropped only if *both* its input state and its output
transition are within the given distances of an already-kept sample
(L2 in z-scored space). If two samples share the same input state but
have different transitions they represent genuinely different dynamics
and are both kept regardless of `min_output_distance`.
Args:
min_state_distance: minimum L2 distance in z-scored input space.
min_output_distance: minimum L2 distance in z-scored output-delta
space. Defaults to 0 (only input distance matters).
"""
if not self.dataset:
return
in_params = [p for p in self.readable_params if p not in self.valve_keys]
out_params = self.non_writable_params
all_states = np.array([[s[p] for p in in_params] for s, *_ in self.dataset], dtype=np.float32)
all_deltas = np.array([[ns[p] - s[p] for p in out_params]
for s, _, ns, gd in self.dataset], dtype=np.float32)
s_mean, s_std = all_states.mean(0), all_states.std(0) + 1e-8
d_mean, d_std = all_deltas.mean(0), all_deltas.std(0) + 1e-8
s_norm = (all_states - s_mean) / s_std
d_norm = (all_deltas - d_mean) / d_std
kept_idx = [0]
kept_s = [s_norm[0]]
kept_d = [d_norm[0]]
for i in range(1, len(self.dataset)):
s_dists = np.linalg.norm(np.array(kept_s) - s_norm[i], axis=1)
d_dists = np.linalg.norm(np.array(kept_d) - d_norm[i], axis=1)
# Drop only if close in BOTH spaces
if not np.any((s_dists < min_state_distance) & (d_dists < min_output_distance)):
kept_idx.append(i)
kept_s.append(s_norm[i])
kept_d.append(d_norm[i])
dropped = len(self.dataset) - len(kept_idx)
self.dataset = [self.dataset[i] for i in kept_idx]
self.save_dataset()
print(f"drop_redundant: kept {len(self.dataset)}, dropped {dropped} samples.")
def _train_epoch(self, data, batch_size):
out_indices = [self.readable_params.index(p) if p in self.readable_params else None
for p in self.non_writable_params]
total_loss = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
self.optimizer.zero_grad()
loss = torch.tensor(0.0)
for state, _, next_state, time_delta in batch:
state_t = self.model._state_dict_to_tensor(state).unsqueeze(0)
td_t = torch.tensor([[time_delta]], dtype=torch.float32)
pred = self.model.net(state_t, td_t).squeeze(0)
target = torch.tensor([next_state[p] for p in self.non_writable_params],
dtype=torch.float32)
loss = loss + torch.nn.functional.mse_loss(pred, target)
loss = loss / len(batch)
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / max(1, len(data) // batch_size)
def _test_epoch(self, data):
total_loss = 0.0
with torch.no_grad():
for state, _, next_state, time_delta in data:
state_t = self.model._state_dict_to_tensor(state).unsqueeze(0)
td_t = torch.tensor([[time_delta]], dtype=torch.float32)
pred = self.model.net(state_t, td_t).squeeze(0)
target = torch.tensor([next_state[p] for p in self.non_writable_params],
dtype=torch.float32)
total_loss += torch.nn.functional.mse_loss(pred, target).item()
return total_loss / len(data)
def save_model(self, path):
if self.model is None:
raise ValueError("No model to save. Call train_model() or fit_knn() first.")
if isinstance(self.model, ReactorDynamicsModel):
torch.save({
'state_dict': self.model.state_dict(),
'input_params': self.model.input_params,
'output_params': self.model.output_params,
}, path)
else:
with open(path, 'wb') as f:
pickle.dump(self.model, f)
def load_model(self, path):
if path.endswith('.pkl'):
with open(path, 'rb') as f:
self.model = pickle.load(f)
else:
checkpoint = torch.load(path, weights_only=False)
if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
m = ReactorDynamicsModel(checkpoint['input_params'], checkpoint['output_params'])
m.load_state_dict(checkpoint['state_dict'])
self.model = m
else:
# legacy plain state dict
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.model.load_state_dict(checkpoint)
def save_dataset(self, path=None):
path = path or self.dataset_path
with open(path, 'wb') as f:
pickle.dump(self.dataset, f)
def load_dataset(self, path=None):
path = path or self.dataset_path
if os.path.exists(path):
with open(path, 'rb') as f:
return pickle.load(f)
return None
def merge_datasets(self, other_dataset_path):
other_dataset = self.load_dataset(other_dataset_path)
if other_dataset:
self.dataset.extend(other_dataset)
self.save_dataset()