Compare commits

..

7 Commits

Author SHA1 Message Date
f4d45d3cfd feat: NuconGoalEnv, composable uncertainty helpers, kNN-GP naming
- Add NuconGoalEnv for goal-conditioned HER training (SAC + HER)
- Add UncertaintyPenalty and UncertaintyAbort composable callables;
  SIM_UNCERTAINTY injected into obs dict when simulator is active
- Fix rl.py: str-typed params crash, missing Enum import, write-only
  params in action space, broken step() iteration order
- Remove uncertainty state from sim (return value from update() instead)
- Rename kNN -> kNN-GP throughout README; add model selection note

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:51:13 +01:00
1b93699501 docs: mention uncertainty penalty/abort in training loop section
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:37:25 +01:00
65190dffea feat: uncertainty-aware training with penalty and abort
sim.py:
- simulator.update(return_uncertainty=True) calls forward_with_uncertainty
  on kNN models and returns the GP std; returns None for NN or when not
  requested (no extra cost if unused)
- No state stored on simulator; caller decides what to do with the value

rl.py (NuconEnv and NuconGoalEnv):
- uncertainty_penalty_start: above this GP std, subtract a linear penalty
  from the reward (scaled by uncertainty_penalty_scale, default 1.0)
- uncertainty_abort: at or above this GP std, set truncated=True
- Only calls update(return_uncertainty=True) when either threshold is set
- Uncertainty only applies when using a simulator (kNN model); ignored otherwise

Example:
    simulator = NuconSimulator()
    simulator.load_model('reactor_knn.pkl')
    env = NuconGoalEnv(..., simulator=simulator,
                       uncertainty_penalty_start=0.3,
                       uncertainty_abort=0.7,
                       uncertainty_penalty_scale=2.0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:37:09 +01:00
6cb93ad56d feat: abort trajectory on high kNN uncertainty in simulator
NuconSimulator now accepts uncertainty_threshold (default None = disabled).
When set and using a kNN model, _update_reactor_state() calls
forward_with_uncertainty() and raises HighUncertaintyError if the GP
posterior std exceeds the threshold.

NuconEnv and NuconGoalEnv catch HighUncertaintyError in step() and
return truncated=True, so SB3 bootstraps the value rather than treating
OOD regions as terminal states.

Usage:
    simulator = NuconSimulator(uncertainty_threshold=0.3)
    # episodes are cut short when the policy wanders OOD

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:29:54 +01:00
e2e8db1f04 docs: remove WIP labels and clean up stale transitional prose
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:22:25 +01:00
7ee8272034 docs: replace step-by-step code blocks in training loop with prose
The prior sections already have full code examples; the training loop
section now just describes each step concisely and links back to them.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:20:10 +01:00
f0cc7ba9c4 docs: replace em-dashes in body text with natural punctuation
Keep em-dashes in step headings, replace in prose with ;/:/./,

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:19:04 +01:00
3 changed files with 286 additions and 314 deletions

154
README.md
View File

@ -111,9 +111,9 @@ Custom Enum Types:
So if you're not in the mood to play the game manually, this API can be used to easily create your own automations and control systems. Maybe a little PID controller for the rods? Or, if you wanna go crazy, why not try some
## Reinforcement Learning (Work in Progress)
## Reinforcement Learning
NuCon includes a preliminary Reinforcement Learning (RL) environment based on the OpenAI Gym interface. This allows you to train control policies for the Nucleares game instead of writing them yourself. This feature is currently a work in progress and requires additional dependencies.
NuCon includes a Reinforcement Learning (RL) environment based on the OpenAI Gym interface. This allows you to train control policies for the Nucleares game instead of writing them yourself. Requires additional dependencies.
### Additional Dependencies
@ -127,17 +127,17 @@ pip install -e '.[rl]'
Two environment classes are provided in `nucon/rl.py`:
**`NuconEnv`** classic fixed-objective environment. You define one or more objectives at construction time (e.g. maximise power output, keep temperature in range). The agent always trains toward the same goal.
**`NuconEnv`**: classic fixed-objective environment. You define one or more objectives at construction time (e.g. maximise power output, keep temperature in range). The agent always trains toward the same goal.
- Observation space: all readable numeric parameters (~290 dims).
- Action space: all readable-back writable parameters (~30 dims): 9 individual rod bank positions, 3 MSCVs, 3 turbine bypass valves, 6 coolant pump speeds, condenser pump, freight/vent switches, resistor banks, and more.
- Objectives: predefined strings (`'max_power'`, `'episode_time'`) or arbitrary callables `(obs) -> float`. Multiple objectives are weighted-summed.
**`NuconGoalEnv`** goal-conditioned environment. The desired goal (e.g. target generator output) is sampled at the start of each episode and provided as part of the observation. A single policy learns to reach *any* goal in the specified range, making it far more useful than a fixed-objective agent. Designed for training with [Hindsight Experience Replay (HER)](https://arxiv.org/abs/1707.01495), which makes sparse-reward goal-conditioned training tractable.
**`NuconGoalEnv`**: goal-conditioned environment. The desired goal (e.g. target generator output) is sampled at the start of each episode and provided as part of the observation. A single policy learns to reach *any* goal in the specified range, making it far more useful than a fixed-objective agent. Designed for training with [Hindsight Experience Replay (HER)](https://arxiv.org/abs/1707.01495), which makes sparse-reward goal-conditioned training tractable.
- Observation space: `Dict` with keys `observation` (non-goal params), `achieved_goal` (current goal param values, normalised to [0,1]), `desired_goal` (target, normalised to [0,1]).
- Goals are sampled uniformly from the specified `goal_range` each episode.
- Reward defaults to negative L2 distance in normalised goal space (dense). Pass `tolerance` for a sparse `{0, -1}` reward this works particularly well with HER.
- Reward defaults to negative L2 distance in normalised goal space (dense). Pass `tolerance` for a sparse `{0, -1}` reward; this works particularly well with HER.
### NuconEnv Usage
@ -193,7 +193,7 @@ env.close()
### NuconGoalEnv + HER Usage
HER works by relabelling past trajectories with the goal that was *actually achieved*, turning every episode into useful training signal even when the agent never reaches the intended target. This makes it much more sample-efficient than standard RL for goal-reaching tasks — important given how slow the real game is.
HER works by relabelling past trajectories with the goal that was *actually achieved*, turning every episode into useful training signal even when the agent never reaches the intended target. This makes it much more sample-efficient than standard RL for goal-reaching tasks. This matters a lot given how slow the real game is.
```python
from nucon.rl import NuconGoalEnv
@ -242,9 +242,9 @@ Predefined goal environments:
- `Nucon-goal_power-v0`: target total generator output (3 × 01200 kW)
- `Nucon-goal_temp-v0`: target core temperature (280380 °C)
But theres a problem: RL algorithms require a huge amount of training steps to get passable policies, and Nucleares is a very slow simulation and can not be trivially parallelized. That's why NuCon also provides a
RL algorithms require a huge number of training steps, and Nucleares is slow and cannot be trivially parallelised. That's why NuCon provides a built-in simulator.
## Simulator (Work in Progress)
## Simulator
NuCon provides a built-in simulator to address the challenge of slow training times in the actual Nucleares game. This simulator allows for rapid prototyping and testing of control policies without the need for the full game environment. Key features include:
@ -292,16 +292,16 @@ env = NuconEnv(simulator=simulator) # When given a similator, instead of waiting
# ...
```
But theres yet another problem: We do not know the exact simulation dynamics of the game and can therefore not implement an accurate simulator. Thats why NuCon also provides
The simulator needs an accurate dynamics model of the game. NuCon provides tools to learn one from real gameplay data.
## Model Learning (Work in Progress)
## Model Learning
To address the challenge of unknown game dynamics, NuCon provides tools for collecting data, creating datasets, and training models to learn the reactor dynamics. Key features include:
- **Data Collection**: Gathers state transitions from human play or automated agents. `time_delta` is specified in game-time seconds; wall-clock sleep is automatically adjusted for `GAME_SIM_SPEED` so collected deltas are uniform regardless of simulation speed.
- **Automatic param filtering**: Junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and params from uninstalled subsystems (returns `None`) are automatically excluded from model inputs/outputs.
- **Two model backends**: Neural network (NN) or k-Nearest Neighbours with GP interpolation (kNN).
- **Uncertainty estimation**: The kNN backend returns a GP posterior standard deviation alongside each prediction 0 means the query lies on known data, ~1 means it is out of distribution.
- **Two model backends**: Neural network (NN) or a local Gaussian Process approximated via k-Nearest Neighbours (kNN-GP).
- **Uncertainty estimation**: The kNN-GP backend returns a GP posterior standard deviation alongside each prediction; 0 means the query lies on known data, ~1 means it is out of distribution.
- **Dataset management**: Tools for saving, loading, merging, and pruning datasets.
### Additional Dependencies
@ -310,12 +310,16 @@ To address the challenge of unknown game dynamics, NuCon provides tools for coll
pip install -e '.[model]'
```
### Model selection
**kNN-GP** (the `ReactorKNNModel` backend) is a local Gaussian Process: it finds the `k` nearest neighbours in the training set, fits an RBF kernel on them, and returns a prediction plus a GP posterior std as uncertainty. It works well from a few hundred samples and requires no training. **NN** needs input normalisation and several thousand samples to generalise; use it once you have a large dataset. For initial experiments, start with kNN-GP (`k=10`).
### Usage
```python
from nucon.model import NuconModelLearner
# --- Data collection (model_type not needed here) ---
# --- Data collection ---
learner = NuconModelLearner(
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
include_valve_states=False, # set True to include all 53 valve positions as model inputs
@ -333,13 +337,13 @@ nn_learner.train_model(batch_size=32, num_epochs=50) # creates NN model on firs
nn_learner.drop_well_fitted(error_threshold=1.0)
nn_learner.save_model('reactor_nn.pth')
# --- kNN + GP backend ---
# --- kNN-GP backend ---
knn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
# Drop near-duplicate samples before fitting (keeps diverse coverage).
# A sample is dropped only if BOTH its input state AND output transition
# are within the given distances of an already-kept sample.
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
knn_learner.fit_knn(k=10) # creates kNN model on first call
knn_learner.fit_knn(k=10) # creates kNN-GP model on first call
# Point prediction
state = knn_learner._get_state()
@ -358,7 +362,7 @@ The trained models can be integrated into the NuconSimulator to provide accurate
## Full Training Loop
The recommended end-to-end workflow for training an RL operator is an iterative cycle of real-game data collection, model fitting, and simulated training. The real game is slow and cannot be parallelised, so the bulk of RL training happens in the simulator — the game is used only as an oracle for data and evaluation.
The recommended end-to-end workflow for training an RL operator is an iterative cycle of real-game data collection, model fitting, and simulated training. The real game is slow and cannot be parallelised, so the bulk of RL training happens in the simulator. The game is used only as an oracle for data and evaluation.
```
┌─────────────────────────────────────────────────────────────┐
@ -397,123 +401,17 @@ The recommended end-to-end workflow for training an RL operator is an iterative
└─────────────────────┘
```
### Step 1 — Human dataset collection
**Step 1 — Human dataset collection**: Run `NuconModelLearner.collect_data()` during your play session. Cover a wide range of states: startup from cold, ramping power, individual rod bank adjustments. Diversity in the dataset directly determines simulator accuracy. See [Model Learning](#model-learning-work-in-progress) for collection details.
Start `NuconModelLearner` before or during your play session. Try to cover a wide range of reactor states — startup from cold, ramping power up and down, adjusting individual rod banks, pump speed changes. Diversity in the dataset directly determines how accurate the simulator will be.
**Step 2 — Initial model fitting**: Fit a kNN-GP model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning).
```python
from nucon.model import NuconModelLearner
**Step 3 — Train RL in simulator**: Load the fitted model into `NuconSimulator`, then train a `NuconGoalEnv` policy with SAC + HER. The simulator runs far faster than the real game, allowing many trajectories in reasonable time. Use `uncertainty_penalty_start` and `uncertainty_abort` on the env to discourage the policy from wandering into regions the model hasn't seen: a linear penalty kicks in above the soft threshold, and the episode is truncated at the hard threshold. This keeps training within the reliable part of the model's knowledge. See [NuconGoalEnv + HER Usage](#nucongoalenv--her-usage).
learner = NuconModelLearner(
dataset_path='reactor_dataset.pkl',
time_delta=10.0, # 10 game-seconds per sample
)
learner.collect_data(num_steps=500, save_every=10)
```
**Step 4 — Eval in game + collect new data**: Run the trained policy against the real game. This validates simulator accuracy and simultaneously collects new data from states the policy visits, which may be regions the original dataset missed. Run a second `NuconModelLearner` in a background thread to collect concurrently.
The collector saves every 10 steps, retries automatically on game crashes, and scales wall-clock sleep with `GAME_SIM_SPEED` so samples are always 10 game-seconds apart regardless of simulation speed.
**Step 5 — Refit model on expanded data**: Merge new data into the original dataset with `merge_datasets()`, prune with `drop_redundant()`, and refit. Then return to Step 3 with the improved model. Each iteration the simulator gets more accurate and the policy improves.
### Step 2 — Initial model fitting
```python
from nucon.model import NuconModelLearner
learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
# Option A: kNN + GP (instant fit, built-in uncertainty estimation)
learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
learner.fit_knn(k=10)
learner.save_model('reactor_knn.pkl')
# Option B: Neural network (better extrapolation with larger datasets)
learner.train_model(batch_size=32, num_epochs=50)
learner.drop_well_fitted(error_threshold=1.0) # keep hard samples for next round
learner.save_model('reactor_nn.pth')
```
### Step 3 — Train RL in simulator
Load the fitted model into the simulator and train with SAC + HER. The simulator runs orders of magnitude faster than the real game, allowing millions of steps in reasonable time.
```python
from nucon.sim import NuconSimulator, OperatingState
from nucon.rl import NuconGoalEnv
from stable_baselines3 import SAC
from stable_baselines3.common.buffers import HerReplayBuffer
simulator = NuconSimulator()
simulator.load_model('reactor_knn.pkl')
simulator.set_state(OperatingState.NOMINAL)
env = NuconGoalEnv(
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
tolerance=0.05,
simulator=simulator,
seconds_per_step=10,
)
model = SAC(
'MultiInputPolicy', env,
replay_buffer_class=HerReplayBuffer,
replay_buffer_kwargs={'n_sampled_goal': 4, 'goal_selection_strategy': 'future'},
verbose=1,
)
model.learn(total_timesteps=500_000)
model.save('rl_policy.zip')
```
### Step 4 — Eval in game + collect new data
Run the trained policy against the real game. This validates whether the simulator was accurate enough, and simultaneously collects new data covering states the policy visits — which may be regions the original dataset missed.
```python
from nucon.rl import NuconGoalEnv
from nucon.model import NuconModelLearner
from stable_baselines3 import SAC
import numpy as np
# Load policy and run in real game
env = NuconGoalEnv(
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
seconds_per_step=10,
)
policy = SAC.load('rl_policy.zip')
# Simultaneously collect new data
new_data_learner = NuconModelLearner(
dataset_path='reactor_dataset_new.pkl',
time_delta=10.0,
)
obs, _ = env.reset()
for _ in range(200):
action, _ = policy.predict(obs, deterministic=True)
obs, reward, terminated, truncated, _ = env.step(action)
if terminated or truncated:
obs, _ = env.reset()
```
### Step 5 — Refit model on expanded data
Merge the new data into the original dataset and refit:
```python
learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
learner.merge_datasets('reactor_dataset_new.pkl')
# Prune redundant samples before refitting
learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
print(f"Dataset size after pruning: {len(learner.dataset)}")
learner.fit_knn(k=10)
learner.save_model('reactor_knn.pkl')
```
Then go back to Step 3 with the improved model. Each iteration the simulator gets more accurate, the policy gets better, and the new data collection explores increasingly interesting regions of state space.
**When to stop**: when the policy performs well in the real game and the kNN uncertainty stays low throughout an episode (indicating the policy stays within the known data distribution).
Stop when the policy performs well in the real game and kNN-GP uncertainty stays low throughout an episode, indicating the policy stays within the known data distribution.
## Testing

View File

@ -1,159 +1,83 @@
import inspect
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import time
from typing import Dict, Any
from typing import Dict, Any, Callable, List, Optional
from enum import Enum
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
# ---------------------------------------------------------------------------
# Reward / objective helpers
# ---------------------------------------------------------------------------
Objectives = {
"null": lambda obs: 0,
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
"null": lambda obs: 0,
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
"episode_time": lambda obs: obs["EPISODE_TIME"],
}
Parameterized_Objectives = {
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
}
class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__()
def UncertaintyPenalty(start: float = 0.3, scale: float = 1.0, mode: str = 'l2') -> Callable:
"""Objective that penalises high simulator uncertainty.
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
if objective_weights is None:
objective_weights = [1.0 for objective in objectives]
self.objective_weights = objective_weights
self.terminate_above = terminate_above
self.simulator = simulator
Returns a callable ``(obs) -> float`` suitable for use as an objective or
terminator in NuconEnv / NuconGoalEnv. Works because ``SIM_UNCERTAINTY``
is injected into the obs dict whenever a simulator is active.
if nucon is None:
if simulator:
nucon = Nucon(port=simulator.port)
else:
nucon = Nucon()
self.nucon = nucon
Args:
start: uncertainty level at which the penalty starts (default 0.3).
scale: penalty coefficient.
mode: ``'l2'`` (quadratic, default) or ``'linear'``.
# Define observation space
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param_id, param in self.nucon.get_all_readable().items():
sp = _build_param_space(param)
if sp is not None:
obs_spaces[param_id] = sp
self.observation_space = spaces.Dict(obs_spaces)
Example::
# Define action space (only controllable, non-cheat, readable-back params)
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded
sp = _build_param_space(param)
if sp is not None:
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
env = NuconEnv(
objectives=['max_power', UncertaintyPenalty(start=0.3, scale=2.0)],
objective_weights=[1.0, 1.0],
simulator=simulator,
)
"""
excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start)
if mode == 'l2':
return lambda obs: -scale * excess(obs) ** 2
elif mode == 'linear':
return lambda obs: -scale * excess(obs)
else:
raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.")
self.objectives = []
self.terminators = []
for objective in objectives:
if objective in Objectives:
self.objectives.append(Objectives[objective])
elif callable(objective):
self.objectives.append(objective)
else:
raise ValueError(f"Unsupported objective: {objective}")
def UncertaintyAbort(threshold: float = 0.7) -> Callable:
"""Terminator that aborts the episode when simulator uncertainty is too high.
for terminator in terminators:
if terminator in Objectives:
self.terminators.append(Objectives[terminator])
elif callable(terminator):
self.terminators.append(terminator)
else:
raise ValueError(f"Unsupported terminator: {terminator}")
Returns a callable ``(obs) -> float`` for use as a *terminator*. When
the GP posterior std exceeds ``threshold`` the episode is truncated
(``terminated=True``).
def _get_obs(self):
obs = {}
for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == str or param_id not in self.observation_space.spaces:
continue
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step
return obs
Example::
def _get_info(self):
info = {'objectives': {}, 'objectives_weighted': {}}
for objective, weight in zip(self.objectives, self.objective_weights):
obj = objective(self._get_obs())
info['objectives'][objective.__name__] = obj
info['objectives_weighted'][objective.__name__] = obj * weight
return info
env = NuconEnv(
objectives=['max_power'],
terminators=[UncertaintyAbort(threshold=0.7)],
terminate_above=0,
simulator=simulator,
)
"""
return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
observation = self._get_obs()
info = self._get_info()
return observation, info
def step(self, action):
# Apply the action to the Nucon system
for param_id, value in action.items():
param = self.nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value)
observation = self._get_obs()
terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above
truncated = False
info = self._get_info()
reward = sum(obj for obj in info['objectives_weighted'].values())
self._total_steps += 1
if self.simulator:
self.simulator.update(self.seconds_per_step)
else:
# Sleep to let the game advance seconds_per_step game-seconds,
# accounting for the game's simulation speed multiplier.
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
return observation, reward, terminated, truncated, info
def render(self):
if self.render_mode == "human":
pass
def close(self):
pass
def _flatten_action(self, action):
return np.concatenate([v.flatten() for v in action.values()])
def _unflatten_action(self, flat_action):
return {k: v.reshape(1, -1) for k, v in self.action_space.items()}
def _flatten_observation(self, observation):
return np.concatenate([v.flatten() for v in observation.values()])
def _unflatten_observation(self, flat_observation):
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _build_param_space(param):
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
@ -172,30 +96,174 @@ def _build_param_space(param):
return None
def _apply_action(nucon, action):
for param_id, value in action.items():
param = nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
nucon.set(param, value)
# ---------------------------------------------------------------------------
# NuconEnv
# ---------------------------------------------------------------------------
class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5,
objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__()
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
if objective_weights is None:
objective_weights = [1.0 for _ in objectives]
self.objective_weights = objective_weights
self.terminate_above = terminate_above
self.simulator = simulator
if nucon is None:
nucon = Nucon(port=simulator.port) if simulator else Nucon()
self.nucon = nucon
# Observation space — SIM_UNCERTAINTY included when a simulator is present
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
if simulator is not None:
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
for param_id, param in self.nucon.get_all_readable().items():
sp = _build_param_space(param)
if sp is not None:
obs_spaces[param_id] = sp
self.observation_space = spaces.Dict(obs_spaces)
# Action space
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
continue
sp = _build_param_space(param)
if sp is not None:
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
self.objectives = []
self.terminators = []
for objective in objectives:
if objective in Objectives:
self.objectives.append(Objectives[objective])
elif callable(objective):
self.objectives.append(objective)
else:
raise ValueError(f"Unsupported objective: {objective}")
for terminator in terminators:
if terminator in Objectives:
self.terminators.append(Objectives[terminator])
elif callable(terminator):
self.terminators.append(terminator)
else:
raise ValueError(f"Unsupported terminator: {terminator}")
def _get_obs(self, sim_uncertainty=None):
obs = {}
for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == str or param_id not in self.observation_space.spaces:
continue
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step
if 'SIM_UNCERTAINTY' in self.observation_space.spaces:
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
return obs
def _get_info(self, obs):
info = {'objectives': {}, 'objectives_weighted': {}}
for objective, weight in zip(self.objectives, self.objective_weights):
obj = objective(obs)
name = getattr(objective, '__name__', repr(objective))
info['objectives'][name] = obj
info['objectives_weighted'][name] = obj * weight
return info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
observation = self._get_obs()
return observation, self._get_info(observation)
def step(self, action):
_apply_action(self.nucon, action)
# Advance sim (or sleep) — get uncertainty for obs injection
truncated = False
uncertainty = None
if self.simulator:
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
self._total_steps += 1
observation = self._get_obs(sim_uncertainty=uncertainty)
info = self._get_info(observation)
reward = sum(obj for obj in info['objectives_weighted'].values())
terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above
return observation, reward, terminated, truncated, info
def render(self):
pass
def close(self):
pass
def _flatten_observation(self, observation):
return np.concatenate([np.asarray(v).flatten() for v in observation.values()])
# ---------------------------------------------------------------------------
# NuconGoalEnv
# ---------------------------------------------------------------------------
class NuconGoalEnv(gym.Env):
"""
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
The observation is a Dict with three keys as required by GoalEnv / HER:
- 'observation': all readable non-goal, non-str params (same encoding as NuconEnv)
Observation is a Dict with three keys:
- 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active)
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
Reward defaults to negative L2 distance in the normalised goal space (dense).
Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance).
``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly.
reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` the 3-arg form
receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping.
Usage with SB3 HER::
from stable_baselines3 import SAC
from stable_baselines3.common.buffers import HerReplayBuffer
from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort
env = NuconGoalEnv(
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
tolerance=0.05,
simulator=simulator,
# uncertainty-aware reward: penalise OOD, abort if too far out
reward_fn=lambda ag, dg, obs: (
-(np.linalg.norm(ag - dg) ** 2)
- 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2
),
terminators=[UncertaintyAbort(threshold=0.7)],
)
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
model.learn(total_timesteps=200_000)
model.learn(total_timesteps=500_000)
"""
metadata = {'render_modes': ['human']}
@ -227,14 +295,12 @@ class NuconGoalEnv(gym.Env):
self.nucon = nucon
all_readable = self.nucon.get_all_readable()
# Validate goal params and build per-param range arrays
for pid in self.goal_params:
if pid not in all_readable:
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
goal_range = goal_range or {}
self._goal_low = np.array([
self._goal_low = np.array([
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
for pid in self.goal_params
], dtype=np.float32)
@ -243,13 +309,21 @@ class NuconGoalEnv(gym.Env):
for pid in self.goal_params
], dtype=np.float32)
self._goal_range = self._goal_high - self._goal_low
self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero
self._goal_range[self._goal_range == 0] = 1.0
self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None
# Detect reward_fn arity for backward compat (2-arg vs 3-arg)
self._reward_fn = reward_fn
if reward_fn is not None:
n_args = len(inspect.signature(reward_fn).parameters)
self._reward_fn_wants_obs = n_args >= 3
else:
self._reward_fn_wants_obs = False
# Observation subspace: all readable non-str non-goal params
# Observation subspace
goal_set = set(self.goal_params)
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
if simulator is not None:
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
for param_id, param in all_readable.items():
if param_id in goal_set:
continue
@ -264,7 +338,7 @@ class NuconGoalEnv(gym.Env):
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
})
# Action space: readable-back, non-cheat writable params
# Action space
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
@ -274,23 +348,16 @@ class NuconGoalEnv(gym.Env):
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
# Terminators
self._terminators = terminators or []
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
self._total_steps = 0
# ------------------------------------------------------------------
# GoalEnv interface
# ------------------------------------------------------------------
def compute_reward(self, achieved_goal, desired_goal, info):
"""
Dense: negative L2 in normalised goal space (each dim in [0,1]).
Sparse when tolerance is set: 0 if within tolerance, -1 otherwise.
Custom reward_fn overrides both.
"""
"""Dense negative L2, sparse with tolerance, or custom reward_fn."""
obs = info.get('obs', {}) if isinstance(info, dict) else {}
if self._reward_fn is not None:
if self._reward_fn_wants_obs:
return self._reward_fn(achieved_goal, desired_goal, obs)
return self._reward_fn(achieved_goal, desired_goal)
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
if self.tolerance is not None:
@ -298,13 +365,13 @@ class NuconGoalEnv(gym.Env):
return -dist
def _read_goal_values(self):
raw = np.array([
self.nucon.get(pid) or 0.0 for pid in self.goal_params
], dtype=np.float32)
raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32)
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
def _get_obs_dict(self):
def _get_obs_dict(self, sim_uncertainty=None):
obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)}
if 'SIM_UNCERTAINTY' in self.observation_space['observation'].spaces:
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
goal_set = set(self.goal_params)
for param_id, param in self.nucon.get_all_readable().items():
if param_id in goal_set or param_id not in self.observation_space['observation'].spaces:
@ -323,38 +390,28 @@ class NuconGoalEnv(gym.Env):
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
# Sample a new goal uniformly from the goal range
rng = np.random.default_rng(seed)
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
obs = self._get_obs_dict()
return obs, {}
return self._get_obs_dict(), {}
def step(self, action):
for param_id, value in action.items():
param = self.nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value)
_apply_action(self.nucon, action)
obs = self._get_obs_dict()
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {}))
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
truncated = False
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']}
self._total_steps += 1
# Advance sim (or sleep)
uncertainty = None
if self.simulator:
self.simulator.update(self.seconds_per_step)
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
self._total_steps += 1
obs = self._get_obs_dict(sim_uncertainty=uncertainty)
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal'],
'obs': obs['observation']}
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info))
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
truncated = False
return obs, reward, terminated, truncated, info
def render(self):
@ -364,6 +421,10 @@ class NuconGoalEnv(gym.Env):
pass
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def register_nucon_envs():
gym.register(
id='Nucon-max_power-v0',
@ -378,9 +439,11 @@ def register_nucon_envs():
gym.register(
id='Nucon-safe_max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
kwargs={'seconds_per_step': 5,
'objectives': [Parameterized_Objectives['temp_above'](min_temp=310),
Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'],
'objective_weights': [1, 10, 1/100_000]}
)
# Goal-conditioned: target total generator output (train with HER)
gym.register(
id='Nucon-goal_power-v0',
entry_point='nucon.rl:NuconGoalEnv',
@ -390,7 +453,6 @@ def register_nucon_envs():
'seconds_per_step': 5,
}
)
# Goal-conditioned: target core temperature (train with HER)
gym.register(
id='Nucon-goal_temp-v0',
entry_point='nucon.rl:NuconGoalEnv',

View File

@ -215,9 +215,16 @@ class NuconSimulator:
def set_allow_all_writes(self, allow: bool) -> None:
self.allow_all_writes = allow
def update(self, time_step: float) -> None:
self._update_reactor_state(time_step)
def update(self, time_step: float, return_uncertainty: bool = False):
"""Advance the simulator by time_step game-seconds.
If return_uncertainty=True and a kNN model is loaded, returns the GP
posterior std for this step (0 = on known data, ~1 = OOD).
Always returns None when using an NN model.
"""
uncertainty = self._update_reactor_state(time_step, return_uncertainty=return_uncertainty)
self.time += time_step
return uncertainty
def set_model(self, model) -> None:
"""Set a pre-loaded ReactorDynamicsModel or ReactorKNNModel directly."""
@ -249,7 +256,7 @@ class NuconSimulator:
print(f"Error loading model: {str(e)}")
self.model = None
def _update_reactor_state(self, time_step: float) -> None:
def _update_reactor_state(self, time_step: float, return_uncertainty: bool = False):
if not self.model:
raise ValueError("Model not set. Please load a model using load_model() or set_model().")
@ -263,10 +270,13 @@ class NuconSimulator:
value = 0.0 # fallback for params not initialised in sim state
state[param_id] = value
# Forward pass — same interface for both NN and kNN
# Forward pass
uncertainty = None
if isinstance(self.model, ReactorDynamicsModel):
with torch.no_grad():
next_state = self.model.forward(state, time_step)
elif return_uncertainty:
next_state, uncertainty = self.model.forward_with_uncertainty(state, time_step)
else:
next_state = self.model.forward(state, time_step)
@ -277,6 +287,8 @@ class NuconSimulator:
except (ValueError, KeyError):
pass # ignore params that can't be set (type mismatch, unknown)
return uncertainty
def set_state(self, state: OperatingState) -> None:
self._sample_parameters_from_state(state)