Fix rl/sim blockers and add NuconGoalEnv for HER training

rl.py:
- Add missing `from enum import Enum`
- Skip str-typed params in obs/action space construction (was crashing)
- Guard action space: exclude write-only (is_readable=False) and cheat params
- Fix step() param lookup (no longer iterates Nucon, uses _parameters dict directly)
- Correct sim-speed time dilation in real-game sleep
- Extract _build_param_space() helper shared by NuconEnv and NuconGoalEnv
- Add NuconGoalEnv: goal-conditioned env with normalised achieved/desired goal
  vectors, compatible with SB3 HerReplayBuffer; goals sampled per episode
- Register Nucon-goal_power-v0 and Nucon-goal_temp-v0 presets
- Enum obs/action space now scalar index (not one-hot)

sim.py:
- Store self.port and self.host on NuconSimulator
- Add set_model() to accept a pre-loaded model directly
- load_model() detects type by extension (.pkl → kNN, else → NN torch)
  and reads new checkpoint format with embedded input/output param lists
- _update_reactor_state() uses model.input_params (not all readable params),
  calls .forward() directly for both NN and kNN, guards torch.no_grad per type
- Import ReactorKNNModel and pickle

model.py:
- save_model() embeds input_params/output_params in NN checkpoint dict
- load_model() handles new checkpoint format (state_dict key) with fallback

README.md:
- Update note: RODS_POS_ORDERED is no longer the only writable param;
  game v2.2.25.213 exposes rod banks, pumps, MSCVs, switches and more

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dominik Moritz Roth 2026-03-12 17:37:16 +01:00
parent 7fcc809852
commit 0dab7a6cec
4 changed files with 307 additions and 50 deletions

View File

@ -9,7 +9,7 @@ NuCon (Nucleares Controller) is a Python library designed to interface with and
NuCon further provides a work in progress implementation of a reinforcement learning environment for training control policies and a simulator based on model learning. NuCon further provides a work in progress implementation of a reinforcement learning environment for training control policies and a simulator based on model learning.
> [!NOTE] > [!NOTE]
> Nucleares only exposes RODS_POS_ORDERED as writable parameter, and no parameters about core chemistry e.g. Xenon concentration. While NuCon is already usable, it's capabilities are still very limited based on these restrictions. The capabilites are supposed to be extended in future updates to Nucleares, development on the advanced features (Reinforcement / Model Learning) are paused till then. > NuCon is compatible with Nucleares v2.2.25.213. The game exposes a rich set of writable parameters including individual rod bank positions (`ROD_BANK_POS_{0-8}_ORDERED`), pump speeds, MSCV and turbine bypass setpoints, and various switches. Core chemistry parameters (e.g. Xenon concentration) are still read-only. Development on the advanced features (Reinforcement / Model Learning) is ongoing.
## Features ## Features

View File

@ -360,14 +360,22 @@ class NuconModelLearner:
def save_model(self, path): def save_model(self, path):
if isinstance(self.model, ReactorDynamicsModel): if isinstance(self.model, ReactorDynamicsModel):
torch.save(self.model.state_dict(), path) torch.save({
'state_dict': self.model.state_dict(),
'input_params': self.model.input_params,
'output_params': self.model.output_params,
}, path)
else: else:
with open(path, 'wb') as f: with open(path, 'wb') as f:
pickle.dump(self.model, f) pickle.dump(self.model, f)
def load_model(self, path): def load_model(self, path):
if isinstance(self.model, ReactorDynamicsModel): if isinstance(self.model, ReactorDynamicsModel):
self.model.load_state_dict(torch.load(path)) checkpoint = torch.load(path, weights_only=False)
if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
self.model.load_state_dict(checkpoint['state_dict'])
else:
self.model.load_state_dict(checkpoint)
else: else:
with open(path, 'rb') as f: with open(path, 'rb') as f:
self.model = pickle.load(f) self.model = pickle.load(f)

View File

@ -3,6 +3,7 @@ from gymnasium import spaces
import numpy as np import numpy as np
import time import time
from typing import Dict, Any from typing import Dict, Any
from enum import Enum
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
Objectives = { Objectives = {
@ -43,39 +44,19 @@ class NuconEnv(gym.Env):
# Define observation space # Define observation space
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param_id, param in self.nucon.get_all_readable().items(): for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == float: sp = _build_param_space(param)
obs_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) if sp is not None:
elif param.param_type == int: obs_spaces[param_id] = sp
if param.min_val is not None and param.max_val is not None:
obs_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else:
obs_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool:
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum):
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else:
raise ValueError(f"Unsupported observation parameter type: {param.param_type}")
self.observation_space = spaces.Dict(obs_spaces) self.observation_space = spaces.Dict(obs_spaces)
# Define action space # Define action space (only controllable, non-cheat, readable-back params)
action_spaces = {} action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items(): for param_id, param in self.nucon.get_all_writable().items():
if param.param_type == float: if not param.is_readable or param.is_cheat:
action_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded
elif param.param_type == int: sp = _build_param_space(param)
if param.min_val is not None and param.max_val is not None: if sp is not None:
action_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) action_spaces[param_id] = sp
else:
action_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool:
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum):
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else:
raise ValueError(f"Unsupported action parameter type: {param.param_type}")
self.action_space = spaces.Dict(action_spaces) self.action_space = spaces.Dict(action_spaces)
self.objectives = [] self.objectives = []
@ -100,6 +81,8 @@ class NuconEnv(gym.Env):
def _get_obs(self): def _get_obs(self):
obs = {} obs = {}
for param_id, param in self.nucon.get_all_readable().items(): for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == str or param_id not in self.observation_space.spaces:
continue
value = self.nucon.get(param_id) value = self.nucon.get(param_id)
if isinstance(value, Enum): if isinstance(value, Enum):
value = value.value value = value.value
@ -127,9 +110,11 @@ class NuconEnv(gym.Env):
def step(self, action): def step(self, action):
# Apply the action to the Nucon system # Apply the action to the Nucon system
for param_id, value in action.items(): for param_id, value in action.items():
param = next(p for p in self.nucon if p.id == param_id) param = self.nucon._parameters[param_id]
if issubclass(param.param_type, Enum): if issubclass(param.param_type, Enum):
value = param.param_type(value) value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val) value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value) self.nucon.set(param, value)
@ -144,7 +129,10 @@ class NuconEnv(gym.Env):
if self.simulator: if self.simulator:
self.simulator.update(self.seconds_per_step) self.simulator.update(self.seconds_per_step)
else: else:
time.sleep(self.seconds_per_step) # Sleep to let the game advance seconds_per_step game-seconds,
# accounting for the game's simulation speed multiplier.
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
return observation, reward, terminated, truncated, info return observation, reward, terminated, truncated, info
def render(self): def render(self):
@ -167,6 +155,215 @@ class NuconEnv(gym.Env):
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()} return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
def _build_param_space(param):
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
if param.param_type == float:
return spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int:
lo = param.min_val if param.min_val is not None else -np.inf
hi = param.max_val if param.max_val is not None else np.inf
return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32)
elif param.param_type == bool:
return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif param.param_type == str:
return None
elif issubclass(param.param_type, Enum):
return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32)
return None
class NuconGoalEnv(gym.Env):
"""
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
The observation is a Dict with three keys as required by GoalEnv / HER:
- 'observation': all readable non-goal, non-str params (same encoding as NuconEnv)
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
Reward defaults to negative L2 distance in the normalised goal space (dense).
Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance).
Usage with SB3 HER::
from stable_baselines3 import SAC
from stable_baselines3.common.buffers import HerReplayBuffer
env = NuconGoalEnv(
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
simulator=simulator,
)
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
model.learn(total_timesteps=200_000)
"""
metadata = {'render_modes': ['human']}
def __init__(
self,
goal_params,
goal_range=None,
reward_fn=None,
tolerance=None,
nucon=None,
simulator=None,
render_mode=None,
seconds_per_step=5,
terminators=None,
terminate_above=0,
):
super().__init__()
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
self.terminate_above = terminate_above
self.simulator = simulator
self.goal_params = list(goal_params)
self.tolerance = tolerance
if nucon is None:
nucon = Nucon(port=simulator.port) if simulator else Nucon()
self.nucon = nucon
all_readable = self.nucon.get_all_readable()
# Validate goal params and build per-param range arrays
for pid in self.goal_params:
if pid not in all_readable:
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
goal_range = goal_range or {}
self._goal_low = np.array([
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
for pid in self.goal_params
], dtype=np.float32)
self._goal_high = np.array([
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1]
for pid in self.goal_params
], dtype=np.float32)
self._goal_range = self._goal_high - self._goal_low
self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero
self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None
# Observation subspace: all readable non-str non-goal params
goal_set = set(self.goal_params)
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param_id, param in all_readable.items():
if param_id in goal_set:
continue
sp = _build_param_space(param)
if sp is not None:
obs_spaces[param_id] = sp
n_goals = len(self.goal_params)
self.observation_space = spaces.Dict({
'observation': spaces.Dict(obs_spaces),
'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
})
# Action space: readable-back, non-cheat writable params
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
continue
sp = _build_param_space(param)
if sp is not None:
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
# Terminators
self._terminators = terminators or []
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
self._total_steps = 0
# ------------------------------------------------------------------
# GoalEnv interface
# ------------------------------------------------------------------
def compute_reward(self, achieved_goal, desired_goal, info):
"""
Dense: negative L2 in normalised goal space (each dim in [0,1]).
Sparse when tolerance is set: 0 if within tolerance, -1 otherwise.
Custom reward_fn overrides both.
"""
if self._reward_fn is not None:
return self._reward_fn(achieved_goal, desired_goal)
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
if self.tolerance is not None:
return (dist <= self.tolerance).astype(np.float32) - 1.0
return -dist
def _read_goal_values(self):
raw = np.array([
self.nucon.get(pid) or 0.0 for pid in self.goal_params
], dtype=np.float32)
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
def _get_obs_dict(self):
obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)}
goal_set = set(self.goal_params)
for param_id, param in self.nucon.get_all_readable().items():
if param_id in goal_set or param_id not in self.observation_space['observation'].spaces:
continue
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
achieved = self._read_goal_values()
return {
'observation': obs,
'achieved_goal': achieved,
'desired_goal': self._desired_goal.copy(),
}
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
# Sample a new goal uniformly from the goal range
rng = np.random.default_rng(seed)
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
obs = self._get_obs_dict()
return obs, {}
def step(self, action):
for param_id, value in action.items():
param = self.nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value)
obs = self._get_obs_dict()
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {}))
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
truncated = False
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']}
self._total_steps += 1
if self.simulator:
self.simulator.update(self.seconds_per_step)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
return obs, reward, terminated, truncated, info
def render(self):
pass
def close(self):
pass
def register_nucon_envs(): def register_nucon_envs():
gym.register( gym.register(
id='Nucon-max_power-v0', id='Nucon-max_power-v0',
@ -183,5 +380,25 @@ def register_nucon_envs():
entry_point='nucon.rl:NuconEnv', entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]} kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
) )
# Goal-conditioned: target total generator output (train with HER)
gym.register(
id='Nucon-goal_power-v0',
entry_point='nucon.rl:NuconGoalEnv',
kwargs={
'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)},
'seconds_per_step': 5,
}
)
# Goal-conditioned: target core temperature (train with HER)
gym.register(
id='Nucon-goal_temp-v0',
entry_point='nucon.rl:NuconGoalEnv',
kwargs={
'goal_params': ['CORE_TEMP'],
'goal_range': {'CORE_TEMP': (280.0, 380.0)},
'seconds_per_step': 5,
}
)
register_nucon_envs() register_nucon_envs()

View File

@ -5,7 +5,8 @@ from flask import Flask, request, jsonify
from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
import threading import threading
import torch import torch
from nucon.model import ReactorDynamicsModel from nucon.model import ReactorDynamicsModel, ReactorKNNModel
import pickle
class OperatingState(Enum): class OperatingState(Enum):
# Tuple indicates a range of values, while list indicates a set of possible values # Tuple indicates a range of values, while list indicates a set of possible values
@ -165,6 +166,8 @@ class NuconSimulator:
def __init__(self, host: str = 'localhost', port: int = 8786): def __init__(self, host: str = 'localhost', port: int = 8786):
self._nucon = Nucon() self._nucon = Nucon()
self.parameters = self.Parameters(self._nucon) self.parameters = self.Parameters(self._nucon)
self.host = host
self.port = port
self.time = 0.0 self.time = 0.0
self.allow_all_writes = False self.allow_all_writes = False
self.set_state(OperatingState.OFFLINE) self.set_state(OperatingState.OFFLINE)
@ -216,34 +219,63 @@ class NuconSimulator:
self._update_reactor_state(time_step) self._update_reactor_state(time_step)
self.time += time_step self.time += time_step
def set_model(self, model) -> None:
"""Set a pre-loaded ReactorDynamicsModel or ReactorKNNModel directly."""
self.model = model
if isinstance(model, ReactorDynamicsModel):
self.model.eval()
def load_model(self, model_path: str) -> None: def load_model(self, model_path: str) -> None:
"""Load a model from a file. .pkl → ReactorKNNModel, otherwise → ReactorDynamicsModel (torch)."""
try: try:
if model_path.endswith('.pkl'):
with open(model_path, 'rb') as f:
self.model = pickle.load(f)
print(f"kNN model loaded from {model_path}")
else:
# Reconstruct shell from the saved state dict; input/output params
# are stored inside the checkpoint.
checkpoint = torch.load(model_path, weights_only=False)
if isinstance(checkpoint, dict) and 'input_params' in checkpoint:
self.model = ReactorDynamicsModel(checkpoint['input_params'], checkpoint['output_params'])
self.model.load_state_dict(checkpoint['state_dict'])
else:
# Legacy: plain state dict — fall back using sim readable/non-writable lists
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params) self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.model.load_state_dict(torch.load(model_path)) self.model.load_state_dict(checkpoint)
self.model.eval() # Set the model to evaluation mode self.model.eval()
print(f"Model loaded successfully from {model_path}") print(f"NN model loaded from {model_path}")
except Exception as e: except Exception as e:
print(f"Error loading model: {str(e)}") print(f"Error loading model: {str(e)}")
self.model = None self.model = None
def _update_reactor_state(self, time_step: float) -> None: def _update_reactor_state(self, time_step: float) -> None:
if not self.model: if not self.model:
raise ValueError("Model not set. Please load a model using load_model() method.") raise ValueError("Model not set. Please load a model using load_model() or set_model().")
# Build state dict using only the params the model knows about
state = {} state = {}
for param in self.readable_params: for param_id in self.model.input_params:
value = self.get(param) value = getattr(self.parameters, param_id, None)
if isinstance(value, Enum): if isinstance(value, Enum):
value = value.value value = value.value
state[param] = value if value is None:
value = 0.0 # fallback for params not initialised in sim state
state[param_id] = value
# Use the model to predict the next state # Forward pass — same interface for both NN and kNN
if isinstance(self.model, ReactorDynamicsModel):
with torch.no_grad(): with torch.no_grad():
next_state = self.model(state, time_step) next_state = self.model.forward(state, time_step)
else:
next_state = self.model.forward(state, time_step)
# Update the simulator's state # Update only the output params the model predicts
for param, value in next_state.items(): for param_id, value in next_state.items():
self.set(param, value) try:
self.set(param_id, value, force=True)
except (ValueError, KeyError):
pass # ignore params that can't be set (type mismatch, unknown)
def set_state(self, state: OperatingState) -> None: def set_state(self, state: OperatingState) -> None:
self._sample_parameters_from_state(state) self._sample_parameters_from_state(state)