import inspect import gymnasium as gym from gymnasium import spaces import numpy as np import time from typing import Dict, Any, Callable, List, Optional from enum import Enum from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus # --------------------------------------------------------------------------- # Reward / objective helpers # --------------------------------------------------------------------------- Objectives = { "null": lambda obs: 0, "max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"], "episode_time": lambda obs: obs["EPISODE_TIME"], } def _uncertainty_penalty(start=0.3, scale=1.0, mode='l2'): excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start) if mode == 'l2': return lambda obs: -scale * excess(obs) ** 2 elif mode == 'linear': return lambda obs: -scale * excess(obs) else: raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.") def _uncertainty_abort(threshold=0.7): return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0 Parameterized_Objectives = { "target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2), "target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2), "temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2), "temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2), "constant": lambda constant: lambda obs: constant, "uncertainty_penalty": _uncertainty_penalty, # (start, scale, mode) -> (obs) -> float } Parameterized_Terminators = { "uncertainty_abort": _uncertainty_abort, # (threshold,) -> (obs) -> float } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _build_flat_action_space(nucon, obs_param_set=None): """Return (Box, ordered_param_ids) for all writable, readable, non-cheat params. If obs_param_set is provided, only include params in that set. """ params = [] lows, highs = [], [] for param_id, param in nucon.get_all_writable().items(): if not param.is_readable or param.is_cheat: continue if obs_param_set is not None and param_id not in obs_param_set: continue if param.min_val is None or param.max_val is None: continue # SAC requires finite action bounds sp = _build_param_space(param) if sp is None: continue params.append(param_id) lows.append(sp.low[0]) highs.append(sp.high[0]) box = spaces.Box(low=np.array(lows, dtype=np.float32), high=np.array(highs, dtype=np.float32), dtype=np.float32) return box, params def _unflatten_action(flat_action, param_ids): return {pid: float(flat_action[i]) for i, pid in enumerate(param_ids)} def _build_param_space(param): """Return a gymnasium Box for a single NuconParameter, or None if unsupported.""" if param.param_type in (float, int): lo = param.min_val if param.min_val is not None else -np.inf hi = param.max_val if param.max_val is not None else np.inf return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32) elif param.param_type == bool: return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) elif param.param_type == str: return None elif issubclass(param.param_type, Enum): return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32) return None def _apply_action(nucon, action): for param_id, value in action.items(): param = nucon._parameters[param_id] if issubclass(param.param_type, Enum): value = param.param_type(int(np.asarray(value).flat[0])) else: value = param.param_type(np.asarray(value).flat[0]) if param.min_val is not None and param.max_val is not None: value = np.clip(value, param.min_val, param.max_val) nucon.set(param, value) # --------------------------------------------------------------------------- # NuconEnv # --------------------------------------------------------------------------- class NuconEnv(gym.Env): metadata = {'render_modes': ['human']} def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0): super().__init__() self.render_mode = render_mode self.seconds_per_step = seconds_per_step if objective_weights is None: objective_weights = [1.0 for _ in objectives] self.objective_weights = objective_weights self.terminate_above = terminate_above self.simulator = simulator if nucon is None: nucon = Nucon(port=simulator.port) if simulator else Nucon() self.nucon = nucon # Observation space — SIM_UNCERTAINTY included when a simulator is present obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} if simulator is not None: obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) for param_id, param in self.nucon.get_all_readable().items(): sp = _build_param_space(param) if sp is not None: obs_spaces[param_id] = sp self.observation_space = spaces.Dict(obs_spaces) self.action_space, self._action_params = _build_flat_action_space(self.nucon) self.objectives = [] self.terminators = [] for objective in objectives: if objective in Objectives: self.objectives.append(Objectives[objective]) elif callable(objective): self.objectives.append(objective) else: raise ValueError(f"Unsupported objective: {objective}") for terminator in terminators: if terminator in Objectives: self.terminators.append(Objectives[terminator]) elif callable(terminator): self.terminators.append(terminator) else: raise ValueError(f"Unsupported terminator: {terminator}") def _get_obs(self, sim_uncertainty=None): obs = {} for param_id, param in self.nucon.get_all_readable().items(): if param.param_type == str or param_id not in self.observation_space.spaces: continue value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value obs[param_id] = value obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step if 'SIM_UNCERTAINTY' in self.observation_space.spaces: obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 return obs def _get_info(self, obs): info = {'objectives': {}, 'objectives_weighted': {}} for objective, weight in zip(self.objectives, self.objective_weights): obj = objective(obs) name = getattr(objective, '__name__', repr(objective)) info['objectives'][name] = obj info['objectives_weighted'][name] = obj * weight return info def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 observation = self._get_obs() return observation, self._get_info(observation) def step(self, action): _apply_action(self.nucon, _unflatten_action(action, self._action_params)) # Advance sim (or sleep) — get uncertainty for obs injection truncated = False uncertainty = None if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) self._total_steps += 1 observation = self._get_obs(sim_uncertainty=uncertainty) info = self._get_info(observation) reward = sum(obj for obj in info['objectives_weighted'].values()) terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above return observation, reward, terminated, truncated, info def render(self): pass def close(self): pass def _flatten_observation(self, observation): return np.concatenate([np.asarray(v).flatten() for v in observation.values()]) # --------------------------------------------------------------------------- # NuconGoalEnv # --------------------------------------------------------------------------- class NuconGoalEnv(gym.Env): """ Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay). Observation is a Dict with three keys: - 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active) - 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range - 'desired_goal': target values sampled each episode, normalised to [0, 1] ``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly. reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping. Usage with SB3 HER:: from stable_baselines3 import SAC from stable_baselines3.common.buffers import HerReplayBuffer from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort env = NuconGoalEnv( goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)}, tolerance=0.05, simulator=simulator, # uncertainty-aware reward: penalise OOD, abort if too far out reward_fn=lambda ag, dg, obs: ( -(np.linalg.norm(ag - dg) ** 2) - 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2 ), terminators=[UncertaintyAbort(threshold=0.7)], ) model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer) model.learn(total_timesteps=500_000) """ metadata = {'render_modes': ['human']} def __init__( self, goal_params, goal_range=None, reward_fn=None, tolerance=None, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, terminators=None, terminate_above=0, additional_objectives=None, additional_objective_weights=None, obs_params=None, ): super().__init__() self.render_mode = render_mode self.seconds_per_step = seconds_per_step self.terminate_above = terminate_above self.simulator = simulator self.goal_params = list(goal_params) self.tolerance = tolerance if nucon is None: nucon = Nucon(port=simulator.port) if simulator else Nucon() self.nucon = nucon all_readable = self.nucon.get_all_readable() for pid in self.goal_params: if pid not in all_readable: raise ValueError(f"Goal param '{pid}' is not a readable parameter") goal_range = goal_range or {} self._goal_low = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0] for pid in self.goal_params ], dtype=np.float32) self._goal_high = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1] for pid in self.goal_params ], dtype=np.float32) self._goal_range = self._goal_high - self._goal_low self._goal_range[self._goal_range == 0] = 1.0 # Detect reward_fn arity for backward compat (2-arg vs 3-arg) self._reward_fn = reward_fn if reward_fn is not None: n_args = len(inspect.signature(reward_fn).parameters) self._reward_fn_wants_obs = n_args >= 3 else: self._reward_fn_wants_obs = False # Observation params: model.input_params defines the canonical list — the same set is # used whether training in sim or deploying to the real game (the game simply has more # params available; we query only the subset we care about). # Explicit obs_params overrides everything (use when deploying to real game without sim). # SB3 HER requires observation to be a flat Box, not a nested Dict. goal_set = set(self.goal_params) self._obs_with_uncertainty = simulator is not None if obs_params is not None: base_params = [p for p in obs_params if p not in goal_set] elif simulator is not None and hasattr(simulator, 'model') and simulator.model is not None: base_params = [p for p in simulator.model.input_params if p not in goal_set and p in all_readable and _build_param_space(all_readable[p]) is not None] else: base_params = [p for p, param in all_readable.items() if p not in goal_set and _build_param_space(param) is not None] # SIM_UNCERTAINTY is not in _obs_params — it's not available at deployment on the real game self._obs_params = base_params n_goals = len(self.goal_params) self.observation_space = spaces.Dict({ 'observation': spaces.Box(low=-np.inf, high=np.inf, shape=(len(self._obs_params),), dtype=np.float32), 'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), 'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), }) # Action space: writable params within the obs param set (flat Box for SB3 compatibility). self.action_space, self._action_params = _build_flat_action_space(self.nucon, set(base_params)) self._terminators = terminators or [] _objs = additional_objectives or [] self._objectives = [Objectives[o] if isinstance(o, str) else o for o in _objs] self._objective_weights = additional_objective_weights or [1.0] * len(self._objectives) self._desired_goal = np.zeros(n_goals, dtype=np.float32) self._total_steps = 0 def compute_reward(self, achieved_goal, desired_goal, info): """Dense negative L2, sparse with tolerance, or custom reward_fn.""" obs_named = info.get('obs_named', {}) if isinstance(info, dict) else {} if self._reward_fn is not None: if self._reward_fn_wants_obs: return self._reward_fn(achieved_goal, desired_goal, obs_named) return self._reward_fn(achieved_goal, desired_goal) dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1) if self.tolerance is not None: return (dist <= self.tolerance).astype(np.float32) - 1.0 return -dist def _read_goal_values(self): raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32) return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0) def _read_obs(self, sim_uncertainty=None): """Return (gym_obs_dict, reward_obs_dict). gym_obs_dict — flat Box observation for the policy (no SIM_UNCERTAINTY). reward_obs_dict — same values plus SIM_UNCERTAINTY for objectives/terminators/reward_fn. """ reward_obs = {} if self._obs_with_uncertainty: reward_obs['SIM_UNCERTAINTY'] = float(sim_uncertainty) if sim_uncertainty is not None else 0.0 for param_id in self._obs_params: value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value reward_obs[param_id] = float(value) if value is not None else 0.0 obs_vec = np.array([reward_obs[p] for p in self._obs_params], dtype=np.float32) achieved = self._read_goal_values() gym_obs = {'observation': obs_vec, 'achieved_goal': achieved, 'desired_goal': self._desired_goal.copy()} return gym_obs, reward_obs def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 rng = np.random.default_rng(seed) self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32) gym_obs, _ = self._read_obs() return gym_obs, {} def step(self, action): _apply_action(self.nucon, _unflatten_action(action, self._action_params)) if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) uncertainty = None self._total_steps += 1 gym_obs, reward_obs = self._read_obs(sim_uncertainty=uncertainty) info = {'achieved_goal': gym_obs['achieved_goal'], 'desired_goal': gym_obs['desired_goal'], 'obs_named': reward_obs} reward = float(self.compute_reward(gym_obs['achieved_goal'], gym_obs['desired_goal'], info)) reward += sum(w * o(reward_obs) for o, w in zip(self._objectives, self._objective_weights)) terminated = any(t(reward_obs) > self.terminate_above for t in self._terminators) return gym_obs, reward, terminated, False, info def render(self): pass def close(self): pass # --------------------------------------------------------------------------- # Registration # --------------------------------------------------------------------------- def register_nucon_envs(): gym.register( id='Nucon-max_power-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': ['max_power']} ) gym.register( id='Nucon-target_temperature_350-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]} ) gym.register( id='Nucon-safe_max_power-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]} ) gym.register( id='Nucon-goal_power-v0', entry_point='nucon.rl:NuconGoalEnv', kwargs={ 'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], 'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)}, 'seconds_per_step': 5, } ) gym.register( id='Nucon-goal_temp-v0', entry_point='nucon.rl:NuconGoalEnv', kwargs={ 'goal_params': ['CORE_TEMP'], 'goal_range': {'CORE_TEMP': (280.0, 380.0)}, 'seconds_per_step': 5, } ) register_nucon_envs()