Compare commits

...

3 Commits

Author SHA1 Message Date
7fcc809852 Update README: valve API, cheat_mode, model learning overhaul
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:16:25 +01:00
31cb6862e1 Overhaul model learning: kNN+GP, uncertainty, dataset pruning, sim-speed fix
Data collection:
- time_delta is now target game-time; wall sleep = game_delta / sim_speed
  so stored deltas are uniform regardless of GAME_SIM_SPEED setting
- Auto-exclude junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and
  params returning None (uninstalled subsystems)
- Optional include_valve_states=True adds all 53 valve positions as inputs

Model backends (model_type='nn' or 'knn'):
- ReactorKNNModel: k-nearest neighbours with GP interpolation
  - Finds k nearest states, computes per-second transition rates,
    linearly scales to requested game_delta (linear-in-time assumption)
  - forward_with_uncertainty() returns (prediction_dict, gp_std)
    where std≈0 = on known data, std≈1 = out of distribution
- NN training fixed: loss computed in tensor space, mse_loss per batch

Dataset management:
- drop_well_fitted(error_threshold): drop samples model predicts well,
  keep hard cases (useful for NN curriculum)
- drop_redundant(min_state_distance, min_output_distance): drop samples
  that are close in BOTH input state AND output transition space, keeping
  genuinely different dynamics even at the same input state

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:16:22 +01:00
c78106dffc Add valve API, cheat_mode, and write-only param fixes
- Rename is_admin/admin_mode -> is_cheat/cheat_mode (only FUN_* event
  triggers are cheat params, not operational commands like SCRAM)
- Fix steam ejector valve write commands: int 0-100, not bool
- Move SCRAM, EMERGENCY_STOP, bay hatches, turbine trip etc. to normal
  write-only (not cheat-gated)
- Add FUN_IS_ENABLED to readable params (it appears in GET list)
- Add get_valve/get_valves, open/close/off_valve(s) methods with correct
  actuator semantics: OPEN/CLOSE powers motor, OFF holds position

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:16:08 +01:00
3 changed files with 420 additions and 127 deletions

View File

@ -72,7 +72,10 @@ Parameter properties:
- `nucon.<PARAMETER>.value`: Get or set the current value of the parameter. Assigning a new value will write it to the game. - `nucon.<PARAMETER>.value`: Get or set the current value of the parameter. Assigning a new value will write it to the game.
- `nucon.<PARAMETER>.param_type`: Get the type of the parameter - `nucon.<PARAMETER>.param_type`: Get the type of the parameter
- `nucon.<PARAMETER>.is_writable`: Check if the parameter is writable - `nucon.<PARAMETER>.is_writable`: Check if the parameter is writable
- `nucon.<PARAMETER>.is_readable`: `False` for write-only parameters (e.g. VALVE_OPEN, CORE_SCRAM_BUTTON). Reading raises `AttributeError`.
- `nucon.<PARAMETER>.is_cheat`: `True` for game-event triggers (all `FUN_*`). Writing raises `ValueError` unless `cheat_mode=True`.
- `nucon.<PARAMETER>.enum_type`: Get the enum type of the parameter if it's an enum, otherwise None - `nucon.<PARAMETER>.enum_type`: Get the enum type of the parameter if it's an enum, otherwise None
- `nucon.<PARAMETER>.unit`: Unit string if defined (e.g. `'°C'`, `'bar'`, `'%'`)
Parameter methods: Parameter methods:
- `nucon.<PARAMETER>.read()`: Get the current value of the parameter (alias for `value`) - `nucon.<PARAMETER>.read()`: Get the current value of the parameter (alias for `value`)
@ -80,14 +83,23 @@ Parameter methods:
Class methods: Class methods:
- `nucon.get(parameter)`: Get the value of a specific parameter. Also accepts string parameter names. - `nucon.get(parameter)`: Get the value of a specific parameter. Also accepts string parameter names.
- `nucon.set(parameter, value, force=False)`: Set the value of a specific parameter. Also accepts string parameter names. `force` will try to write even if the parameter is known as non-writable or out of known allowed range. - `nucon.set(parameter, value, force=False)`: Set the value of a specific parameter. Also accepts string parameter names. `force` bypasses writable/range/cheat checks.
- `nucon.get_all_readable()`: Get a list of all readable parameters (which is all parameters) - `nucon.get_all_readable()`: Get a dict of all readable parameters.
- `nucon.get_all_writable()`: Get a list of all writable parameters - `nucon.get_all_writable()`: Get a dict of all writable parameters (includes write-only params).
- `nucon.get_all()`: Get all parameter values as a dictionary - `nucon.get_all()`: Get all readable parameter values as a dictionary.
- `nucon.get_all_iter()`: Get all parameter values as a generator - `nucon.get_all_iter()`: Get all readable parameter values as a generator.
- `nucon.get_multiple(params)`: Get values for multiple specified parameters - `nucon.get_multiple(params)`: Get values for multiple specified parameters.
- `nucon.get_multiple_iter(params)`: Get values for multiple specified parameters as a generator - `nucon.get_multiple_iter(params)`: Get values for multiple specified parameters as a generator.
- `nucon.set_dummy_mode(dummy_mode)`: Enable or disable dummy mode for testing. In dummy mode we won't connect to the game and just return sensible values for all params and allow but ignore all writes to writable parameters. - `nucon.get_game_variable_names()`: Query the game for all exposed variable names (GET and POST), excluding special endpoints.
- `nucon.set_dummy_mode(dummy_mode)`: In dummy mode, returns sensible values without connecting to the game and silently ignores writes.
- `nucon.set_cheat_mode(cheat_mode)`: Enable writing to cheat parameters (`FUN_*` event triggers). Default `False`.
Valve API (motorized actuators: OPEN/CLOSE powers the motor, OFF holds current position):
- `nucon.get_valve(name)`: Get state dict for a single valve (`Value`, `IsOpened`, `IsClosed`, `Stuck`, …).
- `nucon.get_valves()`: Get state dict for all 53 valves.
- `nucon.open_valve(name)` / `nucon.open_valves(names)`: Power actuator toward open.
- `nucon.close_valve(name)` / `nucon.close_valves(names)`: Power actuator toward closed.
- `nucon.off_valve(name)` / `nucon.off_valves(names)`: Cut actuator power, hold current position (normal resting state).
Custom Enum Types: Custom Enum Types:
- `PumpStatus`: Enum for pump status (INACTIVE, ACTIVE_NO_SPEED_REACHED\*, ACTIVE_SPEED_REACHED\*, REQUIRES_MAINTENANCE, NOT_INSTALLED, INSUFFICIENT_ENERGY) - `PumpStatus`: Enum for pump status (INACTIVE, ACTIVE_NO_SPEED_REACHED\*, ACTIVE_SPEED_REACHED\*, REQUIRES_MAINTENANCE, NOT_INSTALLED, INSUFFICIENT_ENERGY)
@ -234,41 +246,62 @@ But theres yet another problem: We do not know the exact simulation dynamics of
## Model Learning (Work in Progress) ## Model Learning (Work in Progress)
To address the challenge of unknown game dynamics, NuCon provides tools for collecting data, creating datasets, and training models to learn the reactor dynamics. This approach allows for more accurate simulations and enables model-based control strategies. Key features include: To address the challenge of unknown game dynamics, NuCon provides tools for collecting data, creating datasets, and training models to learn the reactor dynamics. Key features include:
- Data Collection: Supports gathering state transitions from both human play and automated agents. - **Data Collection**: Gathers state transitions from human play or automated agents. `time_delta` is specified in game-time seconds; wall-clock sleep is automatically adjusted for `GAME_SIM_SPEED` so collected deltas are uniform regardless of simulation speed.
- Dataset Management: Tools for saving, loading, and merging datasets. - **Automatic param filtering**: Junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and params from uninstalled subsystems (returns `None`) are automatically excluded from model inputs/outputs.
- Model Training: Train neural network models to predict next states based on current states and time deltas. - **Two model backends**: Neural network (NN) or k-Nearest Neighbours with GP interpolation (kNN).
- Dataset Refinement: Ability to refine datasets by focusing on more challenging or interesting data points. - **Uncertainty estimation**: The kNN backend returns a GP posterior standard deviation alongside each prediction — 0 means the query lies on known data, ~1 means it is out of distribution.
- **Dataset management**: Tools for saving, loading, merging, and pruning datasets.
### Additional Dependencies ### Additional Dependencies
To use you'll need to install `torch` and `numpy`. You can do so via
```bash ```bash
pip install -e '.[model]' pip install -e '.[model]'
``` ```
### Usage: ### Usage
```python ```python
from nucon.model import NuconModelLearner from nucon.model import NuconModelLearner
# Initialize the model learner # --- Data collection ---
learner = NuconModelLearner() learner = NuconModelLearner(
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
# Collect data by querying the game include_valve_states=False, # set True to include all 53 valve positions as model inputs
)
learner.collect_data(num_steps=1000) learner.collect_data(num_steps=1000)
# Train the model
learner.train_model(batch_size=32, num_epochs=10)
# Refine the dataset
learner.refine_dataset(error_threshold=0.1)
# Save the model and dataset
learner.save_model('reactor_model.pth')
learner.save_dataset('reactor_dataset.pkl') learner.save_dataset('reactor_dataset.pkl')
# Merge datasets collected across multiple sessions
learner.merge_datasets('other_session.pkl')
# --- Neural network backend ---
nn_learner = NuconModelLearner(model_type='nn', dataset_path='reactor_dataset.pkl')
nn_learner.train_model(batch_size=32, num_epochs=50)
# Drop samples the NN already predicts well (keep hard cases for further training)
nn_learner.drop_well_fitted(error_threshold=1.0)
nn_learner.save_model('reactor_nn.pth')
# --- kNN + GP backend ---
knn_learner = NuconModelLearner(model_type='knn', knn_k=10, dataset_path='reactor_dataset.pkl')
# Drop near-duplicate samples before fitting (keeps diverse coverage).
# A sample is dropped only if BOTH its input state AND output transition
# are within the given distances of an already-kept sample.
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
knn_learner.fit_knn()
# Point prediction
state = knn_learner._get_state()
pred = knn_learner.model.forward(state, time_delta=10.0)
# Prediction with uncertainty
pred, uncertainty = knn_learner.predict_with_uncertainty(state, time_delta=10.0)
print(f"CORE_TEMP: {pred['CORE_TEMP']:.1f} ± {uncertainty:.3f} (std, GP posterior)")
# uncertainty ≈ 0: confident (query near known data)
# uncertainty ≈ 1: out of distribution
knn_learner.save_model('reactor_knn.pkl')
``` ```
The trained models can be integrated into the NuconSimulator to provide accurate dynamics based on real game data. The trained models can be integrated into the NuconSimulator to provide accurate dynamics based on real game data.

View File

@ -68,13 +68,13 @@ SPECIAL_VARIABLES = frozenset({
}) })
class NuconParameter: class NuconParameter:
def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None, unit: Optional[str] = None, is_readable: bool = True, is_admin: bool = False): def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None, unit: Optional[str] = None, is_readable: bool = True, is_cheat: bool = False):
self.nucon = nucon self.nucon = nucon
self.id = id self.id = id
self.param_type = param_type self.param_type = param_type
self.is_writable = is_writable self.is_writable = is_writable
self.is_readable = is_readable self.is_readable = is_readable
self.is_admin = is_admin self.is_cheat = is_cheat
self.min_val = min_val self.min_val = min_val
self.max_val = max_val self.max_val = max_val
self.unit = unit self.unit = unit
@ -121,17 +121,17 @@ class NuconParameter:
unit_str = f", unit='{self.unit}'" if self.unit else "" unit_str = f", unit='{self.unit}'" if self.unit else ""
value_str = f", value={self.value}" if self.is_readable else "" value_str = f", value={self.value}" if self.is_readable else ""
rw_str = "write-only" if not self.is_readable else f"is_writable={self.is_writable}" rw_str = "write-only" if not self.is_readable else f"is_writable={self.is_writable}"
admin_str = ", is_admin=True" if self.is_admin else "" admin_str = ", is_cheat=True" if self.is_cheat else ""
return f"NuconParameter(id='{self.id}'{value_str}, param_type={self.param_type.__name__}, {rw_str}{admin_str}{unit_str})" return f"NuconParameter(id='{self.id}'{value_str}, param_type={self.param_type.__name__}, {rw_str}{admin_str}{unit_str})"
def __str__(self): def __str__(self):
return self.id return self.id
class Nucon: class Nucon:
def __init__(self, host: str = 'localhost', port: int = 8785, admin_mode: bool = False): def __init__(self, host: str = 'localhost', port: int = 8785, cheat_mode: bool = False):
self.base_url = f'http://{host}:{port}/' self.base_url = f'http://{host}:{port}/'
self.dummy_mode = False self.dummy_mode = False
self.admin_mode = admin_mode self.cheat_mode = cheat_mode
self._parameters = self._create_parameters() self._parameters = self._create_parameters()
def _create_parameters(self) -> Dict[str, NuconParameter]: def _create_parameters(self) -> Dict[str, NuconParameter]:
@ -145,6 +145,7 @@ class Nucon:
'ALARMS_ACTIVE': (str, False), 'ALARMS_ACTIVE': (str, False),
'GAME_SIM_SPEED': (float, False), 'GAME_SIM_SPEED': (float, False),
'AMBIENT_TEMPERATURE': (float, False, None, None, '°C'), 'AMBIENT_TEMPERATURE': (float, False, None, None, '°C'),
'FUN_IS_ENABLED': (bool, False),
# --- Core thermal/pressure --- # --- Core thermal/pressure ---
'CORE_TEMP': (float, False, 0, 1000, '°C'), 'CORE_TEMP': (float, False, 0, 1000, '°C'),
@ -357,54 +358,48 @@ class Nucon:
'CHEMICAL_CLEANING_PUMP_OVERLOAD_STATUS': (PumpOverloadStatus, False), 'CHEMICAL_CLEANING_PUMP_OVERLOAD_STATUS': (PumpOverloadStatus, False),
} }
# Write-only params: normal control setpoints (no admin restriction) # Write-only params: normal operational commands
write_only_values = { write_only_values = {
# --- MSCVs (Main Steam Control Valves) --- # --- MSCVs (Main Steam Control Valves) setpoints ---
**{f'MSCV_{i}_OPENING_ORDERED': (float, True, 0, 100, '%') for i in range(3)}, **{f'MSCV_{i}_OPENING_ORDERED': (float, True, 0, 100, '%') for i in range(3)},
# --- Steam turbine bypass setpoints --- # --- Steam turbine bypass setpoints ---
**{f'STEAM_TURBINE_{i}_BYPASS_ORDERED': (float, True, 0, 100, '%') for i in range(3)}, **{f'STEAM_TURBINE_{i}_BYPASS_ORDERED': (float, True, 0, 100, '%') for i in range(3)},
# --- Chemistry setpoints --- # --- Steam ejector valve setpoints (0-100 position, not bool) ---
'CHEM_BORON_DOSAGE_ORDERED_RATE': (float, True, 0, 100, '%'), 'STEAM_EJECTOR_STARTUP_MOTIVE_VALVE': (int, True, 0, 100, '%'),
'CHEM_BORON_FILTER_ORDERED_SPEED': (float, True, 0, 100, '%'), 'STEAM_EJECTOR_OPERATIONAL_MOTIVE_VALVE': (int, True, 0, 100, '%'),
} 'STEAM_EJECTOR_CONDENSER_RETURN_VALVE': (int, True, 0, 100, '%'),
# Write-only admin params: destructive/irreversible operations, blocked unless admin_mode=True # --- Generic valve commands (value = valve name e.g. "M01", "M02", "M03") ---
write_only_admin_values = {
# --- Core safety actions ---
'CORE_SCRAM_BUTTON': (bool, True),
'CORE_EMERGENCY_STOP': (bool, True),
'CORE_END_EMERGENCY_STOP': (bool, True),
'RESET_AO': (bool, True),
# --- Core bay physical operations ---
**{f'CORE_BAY_{i}_HATCH': (bool, True) for i in range(1, 10)},
**{f'CORE_BAY_{i}_FUEL_LOADING': (int, True) for i in range(1, 10)},
# --- Bulk rod override ---
'RODS_ALL_POS_ORDERED': (float, True, 0, 100, '%'),
# --- Steam turbine trip ---
'STEAM_TURBINE_TRIP': (bool, True),
# --- Steam ejector valves ---
'STEAM_EJECTOR_CONDENSER_RETURN_VALVE': (bool, True),
'STEAM_EJECTOR_OPERATIONAL_MOTIVE_VALVE': (bool, True),
'STEAM_EJECTOR_STARTUP_MOTIVE_VALVE': (bool, True),
# --- Generic valve commands (take valve name as value) ---
'VALVE_OPEN': (str, True), 'VALVE_OPEN': (str, True),
'VALVE_CLOSE': (str, True), 'VALVE_CLOSE': (str, True),
'VALVE_OFF': (str, True), 'VALVE_OFF': (str, True),
# --- Infrastructure start/stop --- # --- Pump / generator start/stop ---
'CONDENSER_VACUUM_PUMP_START_STOP': (bool, True), 'CONDENSER_VACUUM_PUMP_START_STOP': (bool, True),
'EMERGENCY_GENERATOR_1_START_STOP': (bool, True), 'EMERGENCY_GENERATOR_1_START_STOP': (bool, True),
'EMERGENCY_GENERATOR_2_START_STOP': (bool, True), 'EMERGENCY_GENERATOR_2_START_STOP': (bool, True),
# --- Fun / event triggers (game cheats) --- # --- Chemistry setpoints ---
'FUN_IS_ENABLED': (bool, True), 'CHEM_BORON_DOSAGE_ORDERED_RATE': (float, True, 0, 100, '%'),
'CHEM_BORON_FILTER_ORDERED_SPEED': (float, True, 0, 100, '%'),
# --- Core safety / operational actions ---
'CORE_SCRAM_BUTTON': (bool, True),
'CORE_EMERGENCY_STOP': (bool, True),
'CORE_END_EMERGENCY_STOP': (bool, True),
'RESET_AO': (bool, True),
'STEAM_TURBINE_TRIP': (bool, True),
'RODS_ALL_POS_ORDERED': (float, True, 0, 100, '%'),
# --- Core bay physical operations ---
**{f'CORE_BAY_{i}_HATCH': (bool, True) for i in range(1, 10)},
**{f'CORE_BAY_{i}_FUEL_LOADING': (int, True) for i in range(1, 10)},
}
# Write-only cheat params: game event triggers, blocked unless cheat_mode=True
write_only_cheat_values = {
'FUN_REQUEST_ENABLE': (bool, True), 'FUN_REQUEST_ENABLE': (bool, True),
'FUN_AO_SABOTAGE_ONCE': (bool, True), 'FUN_AO_SABOTAGE_ONCE': (bool, True),
'FUN_AO_SABOTAGE_TIME': (float, True), 'FUN_AO_SABOTAGE_TIME': (float, True),
@ -428,8 +423,8 @@ class Nucon:
} }
for name, values in write_only_values.items(): for name, values in write_only_values.items():
params[name] = NuconParameter(self, name, *values, is_readable=False) params[name] = NuconParameter(self, name, *values, is_readable=False)
for name, values in write_only_admin_values.items(): for name, values in write_only_cheat_values.items():
params[name] = NuconParameter(self, name, *values, is_readable=False, is_admin=True) params[name] = NuconParameter(self, name, *values, is_readable=False, is_cheat=True)
return params return params
def _parse_value(self, parameter: NuconParameter, value: str) -> Union[float, int, bool, str, Enum, None]: def _parse_value(self, parameter: NuconParameter, value: str) -> Union[float, int, bool, str, Enum, None]:
@ -469,8 +464,8 @@ class Nucon:
if not force and not parameter.is_writable: if not force and not parameter.is_writable:
raise ValueError(f"Parameter {parameter} is not writable") raise ValueError(f"Parameter {parameter} is not writable")
if not force and parameter.is_admin and not self.admin_mode: if not force and parameter.is_cheat and not self.cheat_mode:
raise ValueError(f"Parameter {parameter} is an admin parameter. Enable admin_mode on the Nucon instance or use force=True") raise ValueError(f"Parameter {parameter} is a cheat parameter. Enable cheat_mode on the Nucon instance or use force=True")
if not force: if not force:
parameter.check_in_range(value, raise_on_oob=True) parameter.check_in_range(value, raise_on_oob=True)
@ -604,11 +599,59 @@ class Nucon:
def get_all_writable(self) -> List[NuconParameter]: def get_all_writable(self) -> List[NuconParameter]:
return {name: param for name, param in self._parameters.items() if param.is_writable} return {name: param for name, param in self._parameters.items() if param.is_writable}
# --- Valve API ---
# Valves have a motorized actuator. OPEN/CLOSE power the motor toward that end-state;
# OFF cuts power and holds the current position. Normal resting state is OFF.
# The Value field (0-100) is the actual live position during travel.
def _post_valve_command(self, command: str, valve_name: str) -> None:
response = requests.post(self.base_url, params={"variable": command, "value": valve_name})
if response.status_code != 200:
raise Exception(f"Valve command {command} on '{valve_name}' failed. Status: {response.status_code}")
def get_valve(self, valve_name: str) -> Dict[str, Any]:
"""Return current state dict for a single valve (from VALVE_PANEL_JSON)."""
valves = self.get_valves()
if valve_name not in valves:
raise KeyError(f"Valve '{valve_name}' not found")
return valves[valve_name]
def get_valves(self) -> Dict[str, Any]:
"""Return state dict for all valves, keyed by valve name."""
response = requests.get(self.base_url, params={"variable": "VALVE_PANEL_JSON"})
if response.status_code != 200:
raise Exception(f"Failed to get valve panel. Status: {response.status_code}")
return response.json().get("valves", {})
def open_valve(self, valve_name: str) -> None:
"""Power actuator toward open state. Send off_valve() once target is reached."""
self._post_valve_command("VALVE_OPEN", valve_name)
def close_valve(self, valve_name: str) -> None:
"""Power actuator toward closed state. Send off_valve() once target is reached."""
self._post_valve_command("VALVE_CLOSE", valve_name)
def off_valve(self, valve_name: str) -> None:
"""Cut actuator power, hold current position. Normal resting state."""
self._post_valve_command("VALVE_OFF", valve_name)
def open_valves(self, valve_names: List[str]) -> None:
for name in valve_names:
self.open_valve(name)
def close_valves(self, valve_names: List[str]) -> None:
for name in valve_names:
self.close_valve(name)
def off_valves(self, valve_names: List[str]) -> None:
for name in valve_names:
self.off_valve(name)
def set_dummy_mode(self, dummy_mode: bool) -> None: def set_dummy_mode(self, dummy_mode: bool) -> None:
self.dummy_mode = dummy_mode self.dummy_mode = dummy_mode
def set_admin_mode(self, admin_mode: bool) -> None: def set_cheat_mode(self, cheat_mode: bool) -> None:
self.admin_mode = admin_mode self.cheat_mode = cheat_mode
def __getattr__(self, name): def __getattr__(self, name):
if isinstance(name, int): if isinstance(name, int):

View File

@ -8,13 +8,15 @@ from enum import Enum
from nucon import Nucon from nucon import Nucon
import pickle import pickle
import os import os
from typing import Union, Tuple, List from typing import Union, Tuple, List, Dict
Actors = { Actors = {
'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()}, 'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()},
'null': lambda nucon: lambda obs: {}, 'null': lambda nucon: lambda obs: {},
} }
# --- NN-based dynamics model ---
class ReactorDynamicsNet(nn.Module): class ReactorDynamicsNet(nn.Module):
def __init__(self, input_dim, output_dim): def __init__(self, input_dim, output_dim):
super(ReactorDynamicsNet, self).__init__() super(ReactorDynamicsNet, self).__init__()
@ -35,10 +37,7 @@ class ReactorDynamicsModel(nn.Module):
super(ReactorDynamicsModel, self).__init__() super(ReactorDynamicsModel, self).__init__()
self.input_params = input_params self.input_params = input_params
self.output_params = output_params self.output_params = output_params
self.net = ReactorDynamicsNet(len(input_params), len(output_params))
input_dim = len(input_params)
output_dim = len(output_params)
self.net = ReactorDynamicsNet(input_dim, output_dim)
def _state_dict_to_tensor(self, state_dict): def _state_dict_to_tensor(self, state_dict):
return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32) return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32)
@ -52,17 +51,142 @@ class ReactorDynamicsModel(nn.Module):
predicted_tensor = self.net(state_tensor, time_delta_tensor) predicted_tensor = self.net(state_tensor, time_delta_tensor)
return self._tensor_to_state_dict(predicted_tensor.squeeze(0)) return self._tensor_to_state_dict(predicted_tensor.squeeze(0))
# --- kNN-based dynamics model ---
class ReactorKNNModel:
"""
Non-parametric dynamics model using k-nearest neighbours.
For a query (state, game_delta):
1. Find the k dataset entries whose *state* is closest (L2 in normalised space).
2. For each neighbour compute the per-second rate-of-change:
rate_i = (next_state_i - state_i) / game_delta_i
3. Linearly scale to the requested game_delta:
predicted_delta_i = rate_i * game_delta
4. Return the inverse-distance-weighted average of those predicted deltas
added to the current output state.
The linear-in-time assumption means two datapoints at 0.5 s and 2 s contribute
equally once normalised by their own game_delta.
"""
def __init__(self, input_params: List[str], output_params: List[str], k: int = 5):
self.input_params = input_params
self.output_params = output_params
self.k = k
self._states = None # (n, d_in) normalised state matrix
self._rates = None # (n, d_out) (next_out - cur_out) / game_delta
self._raw_states = None # unnormalised, for mean/std computation
self._mean = None
self._std = None
def fit(self, dataset):
"""Build lookup tables from a collected dataset."""
raw, rates = [], []
for state, _action, next_state, game_delta in dataset:
if game_delta <= 0:
continue
s = np.array([state[p] for p in self.input_params], dtype=np.float32)
cur = np.array([state[p] for p in self.output_params], dtype=np.float32)
nxt = np.array([next_state[p] for p in self.output_params], dtype=np.float32)
raw.append(s)
rates.append((nxt - cur) / game_delta)
self._raw_states = np.array(raw)
self._rates = np.array(rates)
self._mean = self._raw_states.mean(axis=0)
self._std = self._raw_states.std(axis=0) + 1e-8
self._states = (self._raw_states - self._mean) / self._std
def _lookup(self, state_dict: Dict):
"""Return (s_norm, idx, k) for the k nearest neighbours."""
s = np.array([state_dict[p] for p in self.input_params], dtype=np.float32)
s_norm = (s - self._mean) / self._std
dists = np.linalg.norm(self._states - s_norm, axis=1)
k = min(self.k, len(dists))
idx = np.argpartition(dists, k - 1)[:k]
return s_norm, idx, k
def forward(self, state_dict: Dict, time_delta: float) -> Dict:
if self._states is None:
raise ValueError("Model not fitted. Call fit(dataset) first.")
return self.forward_with_uncertainty(state_dict, time_delta)[0]
def forward_with_uncertainty(self, state_dict: Dict, time_delta: float):
"""Return (prediction_dict, uncertainty_scalar).
Uncertainty is the GP posterior std in normalised input space:
0 = query lies exactly on a training point (fully confident)
~1 = query is far from all neighbours (maximally uncertain)
"""
if self._states is None:
raise ValueError("Model not fitted. Call fit(dataset) first.")
s_norm, idx, k = self._lookup(state_dict)
X = self._states[idx] # (k, d_in)
Y = self._rates[idx] # (k, d_out)
# RBF kernel (vectorised): k(a,b) = exp(-0.5 ||a-b||^2)
def rbf_matrix(A, B):
diff = A[:, None, :] - B[None, :, :] # (|A|, |B|, d)
return np.exp(-0.5 * (diff ** 2).sum(axis=-1)) # (|A|, |B|)
K = rbf_matrix(X, X) + 1e-4 * np.eye(k) # (k, k)
k_star = rbf_matrix(s_norm[None, :], X)[0] # (k,)
K_inv = np.linalg.inv(K)
mean_rates = k_star @ K_inv @ Y # (d_out,)
# Posterior variance (scalar, shared across all output dims)
var = max(0.0, 1.0 - float(k_star @ K_inv @ k_star))
std = float(np.sqrt(var))
cur_out = np.array([state_dict[p] for p in self.output_params], dtype=np.float32)
predicted = cur_out + mean_rates * time_delta
pred_dict = {p: float(predicted[i]) for i, p in enumerate(self.output_params)}
return pred_dict, std
# --- Learner ---
class NuconModelLearner: class NuconModelLearner:
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl', time_delta: Union[float, Tuple[float, float]] = 0.1): def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl',
time_delta: Union[float, Tuple[float, float]] = 1.0,
model_type: str = 'nn', knn_k: int = 5,
include_valve_states: bool = False):
self.nucon = Nucon() if nucon is None else nucon self.nucon = Nucon() if nucon is None else nucon
self.actor = Actors[actor](self.nucon) if actor in Actors else actor self.actor = Actors[actor](self.nucon) if actor in Actors else actor
self.dataset = self.load_dataset(dataset_path) or [] self.dataset = self.load_dataset(dataset_path) or []
self.dataset_path = dataset_path self.dataset_path = dataset_path
self.include_valve_states = include_valve_states
self.readable_params = list(self.nucon.get_all_readable().keys()) # Exclude params with no physics signal
self.non_writable_params = [param.id for param in self.nucon.get_all_readable().values() if not param.is_writable] _JUNK_PARAMS = frozenset({'GAME_VERSION', 'TIME', 'TIME_STAMP', 'TIME_DAY',
'ALARMS_ACTIVE', 'FUN_IS_ENABLED', 'GAME_SIM_SPEED'})
candidate_params = {k: p for k, p in self.nucon.get_all_readable().items()
if k not in _JUNK_PARAMS and p.param_type != str}
# Filter out params that return None (subsystem not installed)
test_state = {k: self.nucon.get(k) for k in candidate_params}
self.readable_params = [k for k in candidate_params if test_state[k] is not None]
self.non_writable_params = [k for k in self.readable_params
if not self.nucon.get_all_readable()[k].is_writable]
# Optionally include valve positions (input only — valves are externally driven)
self.valve_keys = []
if include_valve_states:
valves = self.nucon.get_valves()
self.valve_keys = [f'VALVE__{name}' for name in sorted(valves.keys())]
self.readable_params = self.readable_params + self.valve_keys
# valve positions are input-only (not predicted as outputs)
if model_type == 'nn':
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params) self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.optimizer = optim.Adam(self.model.parameters()) self.optimizer = optim.Adam(self.model.parameters())
elif model_type == 'knn':
self.model = ReactorKNNModel(self.readable_params, self.non_writable_params, k=knn_k)
self.optimizer = None
else:
raise ValueError(f"Unknown model_type '{model_type}'. Use 'nn' or 'knn'.")
if isinstance(time_delta, (int, float)): if isinstance(time_delta, (int, float)):
self.time_delta = lambda: time_delta self.time_delta = lambda: time_delta
@ -73,87 +197,180 @@ class NuconModelLearner:
def _get_state(self): def _get_state(self):
state = {} state = {}
for param_id, param in self.nucon.get_all_readable().items(): for param_id in self.readable_params:
value = self.nucon.get(param) if param_id in self.valve_keys:
continue # filled below
value = self.nucon.get(param_id)
if isinstance(value, Enum): if isinstance(value, Enum):
value = value.value value = value.value
state[param_id] = value state[param_id] = value
if self.valve_keys:
valves = self.nucon.get_valves()
for key in self.valve_keys:
name = key[len('VALVE__'):]
state[key] = valves.get(name, {}).get('Value', 0.0)
return state return state
def collect_data(self, num_steps): def collect_data(self, num_steps):
"""
Collect state-transition tuples from the live game.
Sleeps wall_time = target_game_delta / sim_speed so that each stored
game_delta is uniform regardless of the game's simulation speed setting.
"""
state = self._get_state() state = self._get_state()
for _ in range(num_steps): for _ in range(num_steps):
action = self.actor(state) action = self.actor(state)
start_time = time.time()
for param_id, value in action.items(): for param_id, value in action.items():
self.nucon.set(param_id, value) self.nucon.set(param_id, value)
time_delta = self.time_delta()
time.sleep(time_delta) target_game_delta = self.time_delta()
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(target_game_delta / sim_speed)
next_state = self._get_state() next_state = self._get_state()
self.dataset.append((state, action, next_state, time_delta)) self.dataset.append((state, action, next_state, target_game_delta))
state = next_state state = next_state
self.save_dataset() self.save_dataset()
def refine_dataset(self, error_threshold):
refined_data = []
for state, action, next_state, time_delta in self.dataset:
predicted_next_state = self.model(state, time_delta)
error = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
if error > error_threshold:
refined_data.append((state, action, next_state, time_delta))
self.dataset = refined_data
self.save_dataset()
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2): def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
"""Train the NN model. For kNN, call fit_knn() instead."""
if not isinstance(self.model, ReactorDynamicsModel):
raise ValueError("train_model() is for the NN model. Use fit_knn() for kNN.")
random.shuffle(self.dataset) random.shuffle(self.dataset)
split_idx = int(len(self.dataset) * (1 - test_split)) split_idx = int(len(self.dataset) * (1 - test_split))
train_data = self.dataset[:split_idx] train_data = self.dataset[:split_idx]
test_data = self.dataset[split_idx:] test_data = self.dataset[split_idx:]
for epoch in range(num_epochs): for epoch in range(num_epochs):
train_loss = self._train_epoch(train_data, batch_size) train_loss = self._train_epoch(train_data, batch_size)
test_loss = self._test_epoch(test_data) test_loss = self._test_epoch(test_data)
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}") print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
def fit_knn(self):
"""Fit the kNN/GP model from the current dataset (instantaneous, no gradient steps)."""
if not isinstance(self.model, ReactorKNNModel):
raise ValueError("fit_knn() is for the kNN model. Use train_model() for NN.")
self.model.fit(self.dataset)
print(f"kNN model fitted on {len(self.dataset)} samples.")
def predict_with_uncertainty(self, state_dict: Dict, time_delta: float):
"""Return (prediction_dict, uncertainty_std). Only available for kNN model."""
if not isinstance(self.model, ReactorKNNModel):
raise ValueError("predict_with_uncertainty() requires model_type='knn'.")
return self.model.forward_with_uncertainty(state_dict, time_delta)
def drop_well_fitted(self, error_threshold: float):
"""Drop samples the current model already predicts well (MSE < threshold).
Keeps only hard/surprising transitions. Useful for NN training to focus
capacity on difficult regions of state space.
"""
kept = []
for state, action, next_state, time_delta in self.dataset:
pred = self.model.forward(state, time_delta)
error = sum((pred[p] - next_state[p]) ** 2 for p in self.non_writable_params)
if error > error_threshold:
kept.append((state, action, next_state, time_delta))
dropped = len(self.dataset) - len(kept)
self.dataset = kept
self.save_dataset()
print(f"drop_well_fitted: kept {len(kept)}, dropped {dropped} samples.")
def drop_redundant(self, min_state_distance: float, min_output_distance: float = 0.0):
"""Drop near-duplicate samples, keeping only those that add coverage.
A sample is dropped only if *both* its input state and its output
transition are within the given distances of an already-kept sample
(L2 in z-scored space). If two samples share the same input state but
have different transitions they represent genuinely different dynamics
and are both kept regardless of `min_output_distance`.
Args:
min_state_distance: minimum L2 distance in z-scored input space.
min_output_distance: minimum L2 distance in z-scored output-delta
space. Defaults to 0 (only input distance matters).
"""
if not self.dataset:
return
in_params = [p for p in self.readable_params if p not in self.valve_keys]
out_params = self.non_writable_params
all_states = np.array([[s[p] for p in in_params] for s, *_ in self.dataset], dtype=np.float32)
all_deltas = np.array([[ns[p] - s[p] for p in out_params]
for s, _, ns, gd in self.dataset], dtype=np.float32)
s_mean, s_std = all_states.mean(0), all_states.std(0) + 1e-8
d_mean, d_std = all_deltas.mean(0), all_deltas.std(0) + 1e-8
s_norm = (all_states - s_mean) / s_std
d_norm = (all_deltas - d_mean) / d_std
kept_idx = [0]
kept_s = [s_norm[0]]
kept_d = [d_norm[0]]
for i in range(1, len(self.dataset)):
s_dists = np.linalg.norm(np.array(kept_s) - s_norm[i], axis=1)
d_dists = np.linalg.norm(np.array(kept_d) - d_norm[i], axis=1)
# Drop only if close in BOTH spaces
if not np.any((s_dists < min_state_distance) & (d_dists < min_output_distance)):
kept_idx.append(i)
kept_s.append(s_norm[i])
kept_d.append(d_norm[i])
dropped = len(self.dataset) - len(kept_idx)
self.dataset = [self.dataset[i] for i in kept_idx]
self.save_dataset()
print(f"drop_redundant: kept {len(self.dataset)}, dropped {dropped} samples.")
def _train_epoch(self, data, batch_size): def _train_epoch(self, data, batch_size):
out_indices = [self.readable_params.index(p) if p in self.readable_params else None
for p in self.non_writable_params]
total_loss = 0 total_loss = 0
for i in range(0, len(data), batch_size): for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size] batch = data[i:i+batch_size]
states, _, next_states, time_deltas = zip(*batch)
loss = 0
for state, next_state, time_delta in zip(states, next_states, time_deltas):
predicted_next_state = self.model(state, time_delta)
loss += sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
loss /= len(batch)
self.optimizer.zero_grad() self.optimizer.zero_grad()
loss = torch.tensor(0.0)
for state, _, next_state, time_delta in batch:
state_t = self.model._state_dict_to_tensor(state).unsqueeze(0)
td_t = torch.tensor([[time_delta]], dtype=torch.float32)
pred = self.model.net(state_t, td_t).squeeze(0)
target = torch.tensor([next_state[p] for p in self.non_writable_params],
dtype=torch.float32)
loss = loss + torch.nn.functional.mse_loss(pred, target)
loss = loss / len(batch)
loss.backward() loss.backward()
self.optimizer.step() self.optimizer.step()
total_loss += loss.item() total_loss += loss.item()
return total_loss / max(1, len(data) // batch_size)
return total_loss / (len(data) // batch_size)
def _test_epoch(self, data): def _test_epoch(self, data):
total_loss = 0 total_loss = 0.0
with torch.no_grad(): with torch.no_grad():
for state, _, next_state, time_delta in data: for state, _, next_state, time_delta in data:
predicted_next_state = self.model(state, time_delta) state_t = self.model._state_dict_to_tensor(state).unsqueeze(0)
loss = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params) td_t = torch.tensor([[time_delta]], dtype=torch.float32)
total_loss += loss pred = self.model.net(state_t, td_t).squeeze(0)
target = torch.tensor([next_state[p] for p in self.non_writable_params],
dtype=torch.float32)
total_loss += torch.nn.functional.mse_loss(pred, target).item()
return total_loss / len(data) return total_loss / len(data)
def save_model(self, path): def save_model(self, path):
if isinstance(self.model, ReactorDynamicsModel):
torch.save(self.model.state_dict(), path) torch.save(self.model.state_dict(), path)
else:
with open(path, 'wb') as f:
pickle.dump(self.model, f)
def load_model(self, path): def load_model(self, path):
if isinstance(self.model, ReactorDynamicsModel):
self.model.load_state_dict(torch.load(path)) self.model.load_state_dict(torch.load(path))
else:
with open(path, 'rb') as f:
self.model = pickle.load(f)
def save_dataset(self, path=None): def save_dataset(self, path=None):
path = path or self.dataset_path path = path or self.dataset_path