diff --git a/README.md b/README.md index 000b65a..bf93189 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ NuCon (Nucleares Controller) is a Python library designed to interface with and NuCon further provides a work in progress implementation of a reinforcement learning environment for training control policies and a simulator based on model learning. > [!NOTE] -> Nucleares only exposes RODS_POS_ORDERED as writable parameter, and no parameters about core chemistry e.g. Xenon concentration. While NuCon is already usable, it's capabilities are still very limited based on these restrictions. The capabilites are supposed to be extended in future updates to Nucleares, development on the advanced features (Reinforcement / Model Learning) are paused till then. +> NuCon is compatible with Nucleares v2.2.25.213. The game exposes a rich set of writable parameters including individual rod bank positions (`ROD_BANK_POS_{0-8}_ORDERED`), pump speeds, MSCV and turbine bypass setpoints, and various switches. Core chemistry parameters (e.g. Xenon concentration) are still read-only. Development on the advanced features (Reinforcement / Model Learning) is ongoing. ## Features diff --git a/nucon/model.py b/nucon/model.py index c8d1ca7..43f060e 100644 --- a/nucon/model.py +++ b/nucon/model.py @@ -360,14 +360,22 @@ class NuconModelLearner: def save_model(self, path): if isinstance(self.model, ReactorDynamicsModel): - torch.save(self.model.state_dict(), path) + torch.save({ + 'state_dict': self.model.state_dict(), + 'input_params': self.model.input_params, + 'output_params': self.model.output_params, + }, path) else: with open(path, 'wb') as f: pickle.dump(self.model, f) def load_model(self, path): if isinstance(self.model, ReactorDynamicsModel): - self.model.load_state_dict(torch.load(path)) + checkpoint = torch.load(path, weights_only=False) + if isinstance(checkpoint, dict) and 'state_dict' in checkpoint: + self.model.load_state_dict(checkpoint['state_dict']) + else: + self.model.load_state_dict(checkpoint) else: with open(path, 'rb') as f: self.model = pickle.load(f) diff --git a/nucon/rl.py b/nucon/rl.py index 7ea350d..a44f4f4 100644 --- a/nucon/rl.py +++ b/nucon/rl.py @@ -3,6 +3,7 @@ from gymnasium import spaces import numpy as np import time from typing import Dict, Any +from enum import Enum from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus Objectives = { @@ -43,39 +44,19 @@ class NuconEnv(gym.Env): # Define observation space obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} for param_id, param in self.nucon.get_all_readable().items(): - if param.param_type == float: - obs_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) - elif param.param_type == int: - if param.min_val is not None and param.max_val is not None: - obs_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) - else: - obs_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32) - elif param.param_type == bool: - obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) - elif issubclass(param.param_type, Enum): - obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32) - else: - raise ValueError(f"Unsupported observation parameter type: {param.param_type}") - + sp = _build_param_space(param) + if sp is not None: + obs_spaces[param_id] = sp self.observation_space = spaces.Dict(obs_spaces) - # Define action space + # Define action space (only controllable, non-cheat, readable-back params) action_spaces = {} for param_id, param in self.nucon.get_all_writable().items(): - if param.param_type == float: - action_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) - elif param.param_type == int: - if param.min_val is not None and param.max_val is not None: - action_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) - else: - action_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32) - elif param.param_type == bool: - action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) - elif issubclass(param.param_type, Enum): - action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32) - else: - raise ValueError(f"Unsupported action parameter type: {param.param_type}") - + if not param.is_readable or param.is_cheat: + continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded + sp = _build_param_space(param) + if sp is not None: + action_spaces[param_id] = sp self.action_space = spaces.Dict(action_spaces) self.objectives = [] @@ -100,6 +81,8 @@ class NuconEnv(gym.Env): def _get_obs(self): obs = {} for param_id, param in self.nucon.get_all_readable().items(): + if param.param_type == str or param_id not in self.observation_space.spaces: + continue value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value @@ -127,9 +110,11 @@ class NuconEnv(gym.Env): def step(self, action): # Apply the action to the Nucon system for param_id, value in action.items(): - param = next(p for p in self.nucon if p.id == param_id) + param = self.nucon._parameters[param_id] if issubclass(param.param_type, Enum): - value = param.param_type(value) + value = param.param_type(int(np.asarray(value).flat[0])) + else: + value = param.param_type(np.asarray(value).flat[0]) if param.min_val is not None and param.max_val is not None: value = np.clip(value, param.min_val, param.max_val) self.nucon.set(param, value) @@ -144,7 +129,10 @@ class NuconEnv(gym.Env): if self.simulator: self.simulator.update(self.seconds_per_step) else: - time.sleep(self.seconds_per_step) + # Sleep to let the game advance seconds_per_step game-seconds, + # accounting for the game's simulation speed multiplier. + sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 + time.sleep(self.seconds_per_step / sim_speed) return observation, reward, terminated, truncated, info def render(self): @@ -167,6 +155,215 @@ class NuconEnv(gym.Env): return {k: v.reshape(1, -1) for k, v in self.observation_space.items()} +def _build_param_space(param): + """Return a gymnasium Box for a single NuconParameter, or None if unsupported.""" + if param.param_type == float: + return spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) + elif param.param_type == int: + lo = param.min_val if param.min_val is not None else -np.inf + hi = param.max_val if param.max_val is not None else np.inf + return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32) + elif param.param_type == bool: + return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) + elif param.param_type == str: + return None + elif issubclass(param.param_type, Enum): + return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32) + return None + + +class NuconGoalEnv(gym.Env): + """ + Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay). + + The observation is a Dict with three keys as required by GoalEnv / HER: + - 'observation': all readable non-goal, non-str params (same encoding as NuconEnv) + - 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range + - 'desired_goal': target values sampled each episode, normalised to [0, 1] + + Reward defaults to negative L2 distance in the normalised goal space (dense). + Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance). + + Usage with SB3 HER:: + + from stable_baselines3 import SAC + from stable_baselines3.common.buffers import HerReplayBuffer + + env = NuconGoalEnv( + goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], + goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)}, + simulator=simulator, + ) + model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer) + model.learn(total_timesteps=200_000) + """ + + metadata = {'render_modes': ['human']} + + def __init__( + self, + goal_params, + goal_range=None, + reward_fn=None, + tolerance=None, + nucon=None, + simulator=None, + render_mode=None, + seconds_per_step=5, + terminators=None, + terminate_above=0, + ): + super().__init__() + + self.render_mode = render_mode + self.seconds_per_step = seconds_per_step + self.terminate_above = terminate_above + self.simulator = simulator + self.goal_params = list(goal_params) + self.tolerance = tolerance + + if nucon is None: + nucon = Nucon(port=simulator.port) if simulator else Nucon() + self.nucon = nucon + + all_readable = self.nucon.get_all_readable() + + # Validate goal params and build per-param range arrays + for pid in self.goal_params: + if pid not in all_readable: + raise ValueError(f"Goal param '{pid}' is not a readable parameter") + + goal_range = goal_range or {} + self._goal_low = np.array([ + goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0] + for pid in self.goal_params + ], dtype=np.float32) + self._goal_high = np.array([ + goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1] + for pid in self.goal_params + ], dtype=np.float32) + self._goal_range = self._goal_high - self._goal_low + self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero + + self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None + + # Observation subspace: all readable non-str non-goal params + goal_set = set(self.goal_params) + obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} + for param_id, param in all_readable.items(): + if param_id in goal_set: + continue + sp = _build_param_space(param) + if sp is not None: + obs_spaces[param_id] = sp + + n_goals = len(self.goal_params) + self.observation_space = spaces.Dict({ + 'observation': spaces.Dict(obs_spaces), + 'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), + 'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), + }) + + # Action space: readable-back, non-cheat writable params + action_spaces = {} + for param_id, param in self.nucon.get_all_writable().items(): + if not param.is_readable or param.is_cheat: + continue + sp = _build_param_space(param) + if sp is not None: + action_spaces[param_id] = sp + self.action_space = spaces.Dict(action_spaces) + + # Terminators + self._terminators = terminators or [] + + self._desired_goal = np.zeros(n_goals, dtype=np.float32) + self._total_steps = 0 + + # ------------------------------------------------------------------ + # GoalEnv interface + # ------------------------------------------------------------------ + + def compute_reward(self, achieved_goal, desired_goal, info): + """ + Dense: negative L2 in normalised goal space (each dim in [0,1]). + Sparse when tolerance is set: 0 if within tolerance, -1 otherwise. + Custom reward_fn overrides both. + """ + if self._reward_fn is not None: + return self._reward_fn(achieved_goal, desired_goal) + dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1) + if self.tolerance is not None: + return (dist <= self.tolerance).astype(np.float32) - 1.0 + return -dist + + def _read_goal_values(self): + raw = np.array([ + self.nucon.get(pid) or 0.0 for pid in self.goal_params + ], dtype=np.float32) + return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0) + + def _get_obs_dict(self): + obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)} + goal_set = set(self.goal_params) + for param_id, param in self.nucon.get_all_readable().items(): + if param_id in goal_set or param_id not in self.observation_space['observation'].spaces: + continue + value = self.nucon.get(param_id) + if isinstance(value, Enum): + value = value.value + obs[param_id] = value + achieved = self._read_goal_values() + return { + 'observation': obs, + 'achieved_goal': achieved, + 'desired_goal': self._desired_goal.copy(), + } + + def reset(self, seed=None, options=None): + super().reset(seed=seed) + self._total_steps = 0 + + # Sample a new goal uniformly from the goal range + rng = np.random.default_rng(seed) + self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32) + + obs = self._get_obs_dict() + return obs, {} + + def step(self, action): + for param_id, value in action.items(): + param = self.nucon._parameters[param_id] + if issubclass(param.param_type, Enum): + value = param.param_type(int(np.asarray(value).flat[0])) + else: + value = param.param_type(np.asarray(value).flat[0]) + if param.min_val is not None and param.max_val is not None: + value = np.clip(value, param.min_val, param.max_val) + self.nucon.set(param, value) + + obs = self._get_obs_dict() + reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {})) + terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators) + truncated = False + info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']} + + self._total_steps += 1 + if self.simulator: + self.simulator.update(self.seconds_per_step) + else: + sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 + time.sleep(self.seconds_per_step / sim_speed) + + return obs, reward, terminated, truncated, info + + def render(self): + pass + + def close(self): + pass + + def register_nucon_envs(): gym.register( id='Nucon-max_power-v0', @@ -183,5 +380,25 @@ def register_nucon_envs(): entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]} ) + # Goal-conditioned: target total generator output (train with HER) + gym.register( + id='Nucon-goal_power-v0', + entry_point='nucon.rl:NuconGoalEnv', + kwargs={ + 'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], + 'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)}, + 'seconds_per_step': 5, + } + ) + # Goal-conditioned: target core temperature (train with HER) + gym.register( + id='Nucon-goal_temp-v0', + entry_point='nucon.rl:NuconGoalEnv', + kwargs={ + 'goal_params': ['CORE_TEMP'], + 'goal_range': {'CORE_TEMP': (280.0, 380.0)}, + 'seconds_per_step': 5, + } + ) register_nucon_envs() \ No newline at end of file diff --git a/nucon/sim.py b/nucon/sim.py index 871b03a..784b94b 100644 --- a/nucon/sim.py +++ b/nucon/sim.py @@ -5,7 +5,8 @@ from flask import Flask, request, jsonify from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus import threading import torch -from nucon.model import ReactorDynamicsModel +from nucon.model import ReactorDynamicsModel, ReactorKNNModel +import pickle class OperatingState(Enum): # Tuple indicates a range of values, while list indicates a set of possible values @@ -165,6 +166,8 @@ class NuconSimulator: def __init__(self, host: str = 'localhost', port: int = 8786): self._nucon = Nucon() self.parameters = self.Parameters(self._nucon) + self.host = host + self.port = port self.time = 0.0 self.allow_all_writes = False self.set_state(OperatingState.OFFLINE) @@ -216,34 +219,63 @@ class NuconSimulator: self._update_reactor_state(time_step) self.time += time_step + def set_model(self, model) -> None: + """Set a pre-loaded ReactorDynamicsModel or ReactorKNNModel directly.""" + self.model = model + if isinstance(model, ReactorDynamicsModel): + self.model.eval() + def load_model(self, model_path: str) -> None: + """Load a model from a file. .pkl → ReactorKNNModel, otherwise → ReactorDynamicsModel (torch).""" try: - self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params) - self.model.load_state_dict(torch.load(model_path)) - self.model.eval() # Set the model to evaluation mode - print(f"Model loaded successfully from {model_path}") + if model_path.endswith('.pkl'): + with open(model_path, 'rb') as f: + self.model = pickle.load(f) + print(f"kNN model loaded from {model_path}") + else: + # Reconstruct shell from the saved state dict; input/output params + # are stored inside the checkpoint. + checkpoint = torch.load(model_path, weights_only=False) + if isinstance(checkpoint, dict) and 'input_params' in checkpoint: + self.model = ReactorDynamicsModel(checkpoint['input_params'], checkpoint['output_params']) + self.model.load_state_dict(checkpoint['state_dict']) + else: + # Legacy: plain state dict — fall back using sim readable/non-writable lists + self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params) + self.model.load_state_dict(checkpoint) + self.model.eval() + print(f"NN model loaded from {model_path}") except Exception as e: print(f"Error loading model: {str(e)}") self.model = None def _update_reactor_state(self, time_step: float) -> None: if not self.model: - raise ValueError("Model not set. Please load a model using load_model() method.") + raise ValueError("Model not set. Please load a model using load_model() or set_model().") + # Build state dict using only the params the model knows about state = {} - for param in self.readable_params: - value = self.get(param) + for param_id in self.model.input_params: + value = getattr(self.parameters, param_id, None) if isinstance(value, Enum): value = value.value - state[param] = value + if value is None: + value = 0.0 # fallback for params not initialised in sim state + state[param_id] = value - # Use the model to predict the next state - with torch.no_grad(): - next_state = self.model(state, time_step) + # Forward pass — same interface for both NN and kNN + if isinstance(self.model, ReactorDynamicsModel): + with torch.no_grad(): + next_state = self.model.forward(state, time_step) + else: + next_state = self.model.forward(state, time_step) - # Update the simulator's state - for param, value in next_state.items(): - self.set(param, value) + # Update only the output params the model predicts + for param_id, value in next_state.items(): + try: + self.set(param_id, value, force=True) + except (ValueError, KeyError): + pass # ignore params that can't be set (type mismatch, unknown) def set_state(self, state: OperatingState) -> None: self._sample_parameters_from_state(state)