import inspect import gymnasium as gym from gymnasium import spaces import numpy as np import time from typing import Dict, Any, Callable, List, Optional from enum import Enum from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus # --------------------------------------------------------------------------- # Reward / objective helpers # --------------------------------------------------------------------------- Objectives = { "null": lambda obs: 0, "max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"], "episode_time": lambda obs: obs["EPISODE_TIME"], } def _uncertainty_penalty(start=0.3, scale=1.0, mode='l2'): excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start) if mode == 'l2': return lambda obs: -scale * excess(obs) ** 2 elif mode == 'linear': return lambda obs: -scale * excess(obs) else: raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.") def _uncertainty_abort(threshold=0.7): return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0 Parameterized_Objectives = { "target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2), "target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2), "temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2), "temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2), "constant": lambda constant: lambda obs: constant, "uncertainty_penalty": _uncertainty_penalty, # (start, scale, mode) -> (obs) -> float } Parameterized_Terminators = { "uncertainty_abort": _uncertainty_abort, # (threshold,) -> (obs) -> float } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _build_param_space(param): """Return a gymnasium Box for a single NuconParameter, or None if unsupported.""" if param.param_type == float: return spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) elif param.param_type == int: lo = param.min_val if param.min_val is not None else -np.inf hi = param.max_val if param.max_val is not None else np.inf return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32) elif param.param_type == bool: return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) elif param.param_type == str: return None elif issubclass(param.param_type, Enum): return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32) return None def _apply_action(nucon, action): for param_id, value in action.items(): param = nucon._parameters[param_id] if issubclass(param.param_type, Enum): value = param.param_type(int(np.asarray(value).flat[0])) else: value = param.param_type(np.asarray(value).flat[0]) if param.min_val is not None and param.max_val is not None: value = np.clip(value, param.min_val, param.max_val) nucon.set(param, value) # --------------------------------------------------------------------------- # NuconEnv # --------------------------------------------------------------------------- class NuconEnv(gym.Env): metadata = {'render_modes': ['human']} def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0): super().__init__() self.render_mode = render_mode self.seconds_per_step = seconds_per_step if objective_weights is None: objective_weights = [1.0 for _ in objectives] self.objective_weights = objective_weights self.terminate_above = terminate_above self.simulator = simulator if nucon is None: nucon = Nucon(port=simulator.port) if simulator else Nucon() self.nucon = nucon # Observation space — SIM_UNCERTAINTY included when a simulator is present obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} if simulator is not None: obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) for param_id, param in self.nucon.get_all_readable().items(): sp = _build_param_space(param) if sp is not None: obs_spaces[param_id] = sp self.observation_space = spaces.Dict(obs_spaces) # Action space action_spaces = {} for param_id, param in self.nucon.get_all_writable().items(): if not param.is_readable or param.is_cheat: continue sp = _build_param_space(param) if sp is not None: action_spaces[param_id] = sp self.action_space = spaces.Dict(action_spaces) self.objectives = [] self.terminators = [] for objective in objectives: if objective in Objectives: self.objectives.append(Objectives[objective]) elif callable(objective): self.objectives.append(objective) else: raise ValueError(f"Unsupported objective: {objective}") for terminator in terminators: if terminator in Objectives: self.terminators.append(Objectives[terminator]) elif callable(terminator): self.terminators.append(terminator) else: raise ValueError(f"Unsupported terminator: {terminator}") def _get_obs(self, sim_uncertainty=None): obs = {} for param_id, param in self.nucon.get_all_readable().items(): if param.param_type == str or param_id not in self.observation_space.spaces: continue value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value obs[param_id] = value obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step if 'SIM_UNCERTAINTY' in self.observation_space.spaces: obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 return obs def _get_info(self, obs): info = {'objectives': {}, 'objectives_weighted': {}} for objective, weight in zip(self.objectives, self.objective_weights): obj = objective(obs) name = getattr(objective, '__name__', repr(objective)) info['objectives'][name] = obj info['objectives_weighted'][name] = obj * weight return info def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 observation = self._get_obs() return observation, self._get_info(observation) def step(self, action): _apply_action(self.nucon, action) # Advance sim (or sleep) — get uncertainty for obs injection truncated = False uncertainty = None if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) self._total_steps += 1 observation = self._get_obs(sim_uncertainty=uncertainty) info = self._get_info(observation) reward = sum(obj for obj in info['objectives_weighted'].values()) terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above return observation, reward, terminated, truncated, info def render(self): pass def close(self): pass def _flatten_observation(self, observation): return np.concatenate([np.asarray(v).flatten() for v in observation.values()]) # --------------------------------------------------------------------------- # NuconGoalEnv # --------------------------------------------------------------------------- class NuconGoalEnv(gym.Env): """ Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay). Observation is a Dict with three keys: - 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active) - 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range - 'desired_goal': target values sampled each episode, normalised to [0, 1] ``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly. reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping. Usage with SB3 HER:: from stable_baselines3 import SAC from stable_baselines3.common.buffers import HerReplayBuffer from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort env = NuconGoalEnv( goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)}, tolerance=0.05, simulator=simulator, # uncertainty-aware reward: penalise OOD, abort if too far out reward_fn=lambda ag, dg, obs: ( -(np.linalg.norm(ag - dg) ** 2) - 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2 ), terminators=[UncertaintyAbort(threshold=0.7)], ) model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer) model.learn(total_timesteps=500_000) """ metadata = {'render_modes': ['human']} def __init__( self, goal_params, goal_range=None, reward_fn=None, tolerance=None, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, terminators=None, terminate_above=0, additional_objectives=None, additional_objective_weights=None, ): super().__init__() self.render_mode = render_mode self.seconds_per_step = seconds_per_step self.terminate_above = terminate_above self.simulator = simulator self.goal_params = list(goal_params) self.tolerance = tolerance if nucon is None: nucon = Nucon(port=simulator.port) if simulator else Nucon() self.nucon = nucon all_readable = self.nucon.get_all_readable() for pid in self.goal_params: if pid not in all_readable: raise ValueError(f"Goal param '{pid}' is not a readable parameter") goal_range = goal_range or {} self._goal_low = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0] for pid in self.goal_params ], dtype=np.float32) self._goal_high = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1] for pid in self.goal_params ], dtype=np.float32) self._goal_range = self._goal_high - self._goal_low self._goal_range[self._goal_range == 0] = 1.0 # Detect reward_fn arity for backward compat (2-arg vs 3-arg) self._reward_fn = reward_fn if reward_fn is not None: n_args = len(inspect.signature(reward_fn).parameters) self._reward_fn_wants_obs = n_args >= 3 else: self._reward_fn_wants_obs = False # Observation subspace goal_set = set(self.goal_params) obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} if simulator is not None: obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) for param_id, param in all_readable.items(): if param_id in goal_set: continue sp = _build_param_space(param) if sp is not None: obs_spaces[param_id] = sp n_goals = len(self.goal_params) self.observation_space = spaces.Dict({ 'observation': spaces.Dict(obs_spaces), 'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), 'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), }) # Action space action_spaces = {} for param_id, param in self.nucon.get_all_writable().items(): if not param.is_readable or param.is_cheat: continue sp = _build_param_space(param) if sp is not None: action_spaces[param_id] = sp self.action_space = spaces.Dict(action_spaces) self._terminators = terminators or [] _objs = additional_objectives or [] self._objectives = [Objectives[o] if isinstance(o, str) else o for o in _objs] self._objective_weights = additional_objective_weights or [1.0] * len(self._objectives) self._desired_goal = np.zeros(n_goals, dtype=np.float32) self._total_steps = 0 def compute_reward(self, achieved_goal, desired_goal, info): """Dense negative L2, sparse with tolerance, or custom reward_fn.""" obs = info.get('obs', {}) if isinstance(info, dict) else {} if self._reward_fn is not None: if self._reward_fn_wants_obs: return self._reward_fn(achieved_goal, desired_goal, obs) return self._reward_fn(achieved_goal, desired_goal) dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1) if self.tolerance is not None: return (dist <= self.tolerance).astype(np.float32) - 1.0 return -dist def _read_goal_values(self): raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32) return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0) def _get_obs_dict(self, sim_uncertainty=None): obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)} if 'SIM_UNCERTAINTY' in self.observation_space['observation'].spaces: obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 goal_set = set(self.goal_params) for param_id, param in self.nucon.get_all_readable().items(): if param_id in goal_set or param_id not in self.observation_space['observation'].spaces: continue value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value obs[param_id] = value achieved = self._read_goal_values() return { 'observation': obs, 'achieved_goal': achieved, 'desired_goal': self._desired_goal.copy(), } def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 rng = np.random.default_rng(seed) self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32) return self._get_obs_dict(), {} def step(self, action): _apply_action(self.nucon, action) # Advance sim (or sleep) uncertainty = None if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) self._total_steps += 1 obs = self._get_obs_dict(sim_uncertainty=uncertainty) info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal'], 'obs': obs['observation']} reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info)) reward += sum(w * o(obs['observation']) for o, w in zip(self._objectives, self._objective_weights)) terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators) truncated = False return obs, reward, terminated, truncated, info def render(self): pass def close(self): pass # --------------------------------------------------------------------------- # Registration # --------------------------------------------------------------------------- def register_nucon_envs(): gym.register( id='Nucon-max_power-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': ['max_power']} ) gym.register( id='Nucon-target_temperature_350-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]} ) gym.register( id='Nucon-safe_max_power-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]} ) gym.register( id='Nucon-goal_power-v0', entry_point='nucon.rl:NuconGoalEnv', kwargs={ 'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], 'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)}, 'seconds_per_step': 5, } ) gym.register( id='Nucon-goal_temp-v0', entry_point='nucon.rl:NuconGoalEnv', kwargs={ 'goal_params': ['CORE_TEMP'], 'goal_range': {'CORE_TEMP': (280.0, 380.0)}, 'seconds_per_step': 5, } ) register_nucon_envs()