From 3dfe1aa673f9a414d8941adc9744a9a66be2e919 Mon Sep 17 00:00:00 2001 From: Dominik Roth Date: Thu, 12 Mar 2026 19:16:07 +0100 Subject: [PATCH] fix: flat Box action space, SB3/HER compatibility, sim uninitialized param defaults rl.py: - Action space is now a flat Box (SAC/PPO require this, not Dict) - _build_flat_action_space + _unflatten_action helpers shared by both envs - Params with undefined bounds excluded from action space (SAC needs finite bounds) - Fix _build_param_space: use `is not None` check instead of falsy `or` (0 is valid min_val) - NuconGoalEnv obs params default to simulator.model.input_params when sim provided; obs_params kwarg overrides for real-game deployment with same param set - SIM_UNCERTAINTY kept out of policy obs vector (not available at deployment); available in reward_obs passed to objectives/terminators/reward_fn - _read_obs returns (gym_obs, reward_obs) cleanly instead of smuggling via dict - NuconGoalEnv additional_objectives wired into step() sim.py: - Uninitialized params return type-default (0/False/first-enum) instead of "None" - Enum params serialised as integer value, not repr string README.md: - Fix HerReplayBuffer import path (sb3 2.x: her.her_replay_buffer) - Remove non-existent simulator.run() call - Fix broken anchor links, remove "work in progress" from intro Co-Authored-By: Claude Sonnet 4.6 --- README.md | 13 ++--- nucon/rl.py | 144 +++++++++++++++++++++++++++++---------------------- nucon/sim.py | 8 +++ 3 files changed, 96 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index f955c52..beda89c 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ NuCon (Nucleares Controller) is a Python library designed to interface with and control parameters in [Nucleares](https://store.steampowered.com/app/1428420/Nucleares/), a nuclear reactor simulation game. It provides a robust, type-safe foundation for reading and writing game parameters, allowing users to easily create their own automations and control systems. -NuCon further provides a work in progress implementation of a reinforcement learning environment for training control policies and a simulator based on model learning. +NuCon further provides a reinforcement learning environment for training control policies and a simulator based on model learning. > [!NOTE] > NuCon is compatible with Nucleares v2.2.25.213. The game exposes a rich set of writable parameters including individual rod bank positions (`ROD_BANK_POS_{0-8}_ORDERED`), pump speeds, MSCV and turbine bypass setpoints, and various switches. Core chemistry parameters (e.g. Xenon concentration) are still read-only. Development on the advanced features (Reinforcement / Model Learning) is ongoing. @@ -198,7 +198,7 @@ HER works by relabelling past trajectories with the goal that was *actually achi ```python from nucon.rl import NuconGoalEnv, Parameterized_Objectives, Parameterized_Terminators from stable_baselines3 import SAC -from stable_baselines3.common.buffers import HerReplayBuffer +from stable_baselines3.her.her_replay_buffer import HerReplayBuffer env = NuconGoalEnv( @@ -284,10 +284,7 @@ simulator.load_model('path/to/model.pth') # Set initial state (optional) simulator.set_state(OperatingState.NOMINAL) -# Run the simulator, will start the web server -simulator.run() - -# Access via nucon by using the simulator's port +# The web server starts automatically in __init__; access via nucon using the simulator's port nucon = Nucon(port=simulator.port) # Or use the simulator with NuconEnv @@ -407,11 +404,11 @@ The recommended end-to-end workflow for training an RL operator is an iterative └─────────────────────┘ ``` -**Step 1 — Human dataset collection**: Run `NuconModelLearner.collect_data()` during your play session. Cover a wide range of states: startup from cold, ramping power, individual rod bank adjustments. Diversity in the dataset directly determines simulator accuracy. See [Model Learning](#model-learning-work-in-progress) for collection details. +**Step 1 — Human dataset collection**: Run `NuconModelLearner.collect_data()` during your play session. Cover a wide range of states: startup from cold, ramping power, individual rod bank adjustments. Diversity in the dataset directly determines simulator accuracy. See [Model Learning](#model-learning) for collection details. **Step 2 — Initial model fitting**: Fit a kNN-GP model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning). -**Step 3 — Train RL in simulator**: Load the fitted model into `NuconSimulator`, then train a `NuconGoalEnv` policy with SAC + HER. The simulator runs far faster than the real game, allowing many trajectories in reasonable time. Pass `UncertaintyPenalty` and `UncertaintyAbort` as objectives/terminators to discourage the policy from wandering into regions the model hasn't seen; `SIM_UNCERTAINTY` is automatically injected into the obs dict when a simulator is active. See [NuconGoalEnv + HER Usage](#nucongoalenv--her-usage). +**Step 3 — Train RL in simulator**: Load the fitted model into `NuconSimulator`, then train a `NuconGoalEnv` policy with SAC + HER. The simulator runs far faster than the real game, allowing many trajectories in reasonable time. Pass `Parameterized_Objectives['uncertainty_penalty']` and `Parameterized_Terminators['uncertainty_abort']` as additional objectives/terminators to discourage the policy from wandering into regions the model hasn't seen; `SIM_UNCERTAINTY` is automatically injected into the obs dict when a simulator is active. See [NuconGoalEnv + HER Usage](#nucongoalenv--her-usage). **Step 4 — Eval in game + collect new data**: Run the trained policy against the real game. This validates simulator accuracy and simultaneously collects new data from states the policy visits, which may be regions the original dataset missed. Run a second `NuconModelLearner` in a background thread to collect concurrently. diff --git a/nucon/rl.py b/nucon/rl.py index 2fbf22d..0576801 100644 --- a/nucon/rl.py +++ b/nucon/rl.py @@ -49,11 +49,38 @@ Parameterized_Terminators = { # Internal helpers # --------------------------------------------------------------------------- +def _build_flat_action_space(nucon, obs_param_set=None): + """Return (Box, ordered_param_ids) for all writable, readable, non-cheat params. + + If obs_param_set is provided, only include params in that set. + """ + params = [] + lows, highs = [], [] + for param_id, param in nucon.get_all_writable().items(): + if not param.is_readable or param.is_cheat: + continue + if obs_param_set is not None and param_id not in obs_param_set: + continue + if param.min_val is None or param.max_val is None: + continue # SAC requires finite action bounds + sp = _build_param_space(param) + if sp is None: + continue + params.append(param_id) + lows.append(sp.low[0]) + highs.append(sp.high[0]) + box = spaces.Box(low=np.array(lows, dtype=np.float32), + high=np.array(highs, dtype=np.float32), dtype=np.float32) + return box, params + + +def _unflatten_action(flat_action, param_ids): + return {pid: float(flat_action[i]) for i, pid in enumerate(param_ids)} + + def _build_param_space(param): """Return a gymnasium Box for a single NuconParameter, or None if unsupported.""" - if param.param_type == float: - return spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) - elif param.param_type == int: + if param.param_type in (float, int): lo = param.min_val if param.min_val is not None else -np.inf hi = param.max_val if param.max_val is not None else np.inf return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32) @@ -111,15 +138,7 @@ class NuconEnv(gym.Env): obs_spaces[param_id] = sp self.observation_space = spaces.Dict(obs_spaces) - # Action space - action_spaces = {} - for param_id, param in self.nucon.get_all_writable().items(): - if not param.is_readable or param.is_cheat: - continue - sp = _build_param_space(param) - if sp is not None: - action_spaces[param_id] = sp - self.action_space = spaces.Dict(action_spaces) + self.action_space, self._action_params = _build_flat_action_space(self.nucon) self.objectives = [] self.terminators = [] @@ -168,7 +187,7 @@ class NuconEnv(gym.Env): return observation, self._get_info(observation) def step(self, action): - _apply_action(self.nucon, action) + _apply_action(self.nucon, _unflatten_action(action, self._action_params)) # Advance sim (or sleep) — get uncertainty for obs injection truncated = False @@ -252,6 +271,7 @@ class NuconGoalEnv(gym.Env): terminate_above=0, additional_objectives=None, additional_objective_weights=None, + obs_params=None, ): super().__init__() @@ -291,34 +311,35 @@ class NuconGoalEnv(gym.Env): else: self._reward_fn_wants_obs = False - # Observation subspace + # Observation params: model.input_params defines the canonical list — the same set is + # used whether training in sim or deploying to the real game (the game simply has more + # params available; we query only the subset we care about). + # Explicit obs_params overrides everything (use when deploying to real game without sim). + # SB3 HER requires observation to be a flat Box, not a nested Dict. goal_set = set(self.goal_params) - obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} - if simulator is not None: - obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) - for param_id, param in all_readable.items(): - if param_id in goal_set: - continue - sp = _build_param_space(param) - if sp is not None: - obs_spaces[param_id] = sp + self._obs_with_uncertainty = simulator is not None + if obs_params is not None: + base_params = [p for p in obs_params if p not in goal_set] + elif simulator is not None and hasattr(simulator, 'model') and simulator.model is not None: + base_params = [p for p in simulator.model.input_params + if p not in goal_set and p in all_readable + and _build_param_space(all_readable[p]) is not None] + else: + base_params = [p for p, param in all_readable.items() + if p not in goal_set and _build_param_space(param) is not None] + # SIM_UNCERTAINTY is not in _obs_params — it's not available at deployment on the real game + self._obs_params = base_params n_goals = len(self.goal_params) self.observation_space = spaces.Dict({ - 'observation': spaces.Dict(obs_spaces), + 'observation': spaces.Box(low=-np.inf, high=np.inf, + shape=(len(self._obs_params),), dtype=np.float32), 'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), 'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), }) - # Action space - action_spaces = {} - for param_id, param in self.nucon.get_all_writable().items(): - if not param.is_readable or param.is_cheat: - continue - sp = _build_param_space(param) - if sp is not None: - action_spaces[param_id] = sp - self.action_space = spaces.Dict(action_spaces) + # Action space: writable params within the obs param set (flat Box for SB3 compatibility). + self.action_space, self._action_params = _build_flat_action_space(self.nucon, set(base_params)) self._terminators = terminators or [] _objs = additional_objectives or [] @@ -329,10 +350,10 @@ class NuconGoalEnv(gym.Env): def compute_reward(self, achieved_goal, desired_goal, info): """Dense negative L2, sparse with tolerance, or custom reward_fn.""" - obs = info.get('obs', {}) if isinstance(info, dict) else {} + obs_named = info.get('obs_named', {}) if isinstance(info, dict) else {} if self._reward_fn is not None: if self._reward_fn_wants_obs: - return self._reward_fn(achieved_goal, desired_goal, obs) + return self._reward_fn(achieved_goal, desired_goal, obs_named) return self._reward_fn(achieved_goal, desired_goal) dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1) if self.tolerance is not None: @@ -343,52 +364,53 @@ class NuconGoalEnv(gym.Env): raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32) return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0) - def _get_obs_dict(self, sim_uncertainty=None): - obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)} - if 'SIM_UNCERTAINTY' in self.observation_space['observation'].spaces: - obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 - goal_set = set(self.goal_params) - for param_id, param in self.nucon.get_all_readable().items(): - if param_id in goal_set or param_id not in self.observation_space['observation'].spaces: - continue + def _read_obs(self, sim_uncertainty=None): + """Return (gym_obs_dict, reward_obs_dict). + + gym_obs_dict — flat Box observation for the policy (no SIM_UNCERTAINTY). + reward_obs_dict — same values plus SIM_UNCERTAINTY for objectives/terminators/reward_fn. + """ + reward_obs = {} + if self._obs_with_uncertainty: + reward_obs['SIM_UNCERTAINTY'] = float(sim_uncertainty) if sim_uncertainty is not None else 0.0 + for param_id in self._obs_params: value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value - obs[param_id] = value + reward_obs[param_id] = float(value) if value is not None else 0.0 + + obs_vec = np.array([reward_obs[p] for p in self._obs_params], dtype=np.float32) achieved = self._read_goal_values() - return { - 'observation': obs, - 'achieved_goal': achieved, - 'desired_goal': self._desired_goal.copy(), - } + gym_obs = {'observation': obs_vec, 'achieved_goal': achieved, + 'desired_goal': self._desired_goal.copy()} + return gym_obs, reward_obs def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 rng = np.random.default_rng(seed) self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32) - return self._get_obs_dict(), {} + gym_obs, _ = self._read_obs() + return gym_obs, {} def step(self, action): - _apply_action(self.nucon, action) + _apply_action(self.nucon, _unflatten_action(action, self._action_params)) - # Advance sim (or sleep) - uncertainty = None if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) + uncertainty = None self._total_steps += 1 - obs = self._get_obs_dict(sim_uncertainty=uncertainty) - info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal'], - 'obs': obs['observation']} - reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info)) - reward += sum(w * o(obs['observation']) for o, w in zip(self._objectives, self._objective_weights)) - terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators) - truncated = False - return obs, reward, terminated, truncated, info + gym_obs, reward_obs = self._read_obs(sim_uncertainty=uncertainty) + info = {'achieved_goal': gym_obs['achieved_goal'], 'desired_goal': gym_obs['desired_goal'], + 'obs_named': reward_obs} + reward = float(self.compute_reward(gym_obs['achieved_goal'], gym_obs['desired_goal'], info)) + reward += sum(w * o(reward_obs) for o, w in zip(self._objectives, self._objective_weights)) + terminated = any(t(reward_obs) > self.terminate_above for t in self._terminators) + return gym_obs, reward, terminated, False, info def render(self): pass diff --git a/nucon/sim.py b/nucon/sim.py index cd64221..444d5a2 100644 --- a/nucon/sim.py +++ b/nucon/sim.py @@ -330,6 +330,14 @@ class NuconSimulator: try: value = self.get(variable) + if value is None: + param = self._nucon[variable] + if param.enum_type is not None: + value = next(iter(param.enum_type)).value # first enum member's int value + else: + value = param.param_type() # int()->0, float()->0.0, bool()->False + if isinstance(value, Enum): + value = value.value return str(value), 200 except (KeyError, AttributeError): return jsonify({"error": f"Unknown variable: {variable}"}), 404