import inspect import gymnasium as gym from gymnasium import spaces import numpy as np import time from typing import Dict, Any, Callable, List, Optional from enum import Enum from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus # --------------------------------------------------------------------------- # Reward / objective helpers # --------------------------------------------------------------------------- Objectives = { "null": lambda obs: 0, "max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"], "episode_time": lambda obs: obs["EPISODE_TIME"], } def _uncertainty_penalty(start=0.3, scale=1.0, mode='l2'): excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start) if mode == 'l2': return lambda obs: -scale * excess(obs) ** 2 elif mode == 'linear': return lambda obs: -scale * excess(obs) else: raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.") def _uncertainty_abort(threshold=0.7): return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0 Parameterized_Objectives = { "target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2), "target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2), "temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2), "temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2), "constant": lambda constant: lambda obs: constant, "uncertainty_penalty": _uncertainty_penalty, # (start, scale, mode) -> (obs) -> float } Parameterized_Terminators = { "uncertainty_abort": _uncertainty_abort, # (threshold,) -> (obs) -> float } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _build_flat_action_space(nucon, obs_param_set=None, delta_action_scale=None): """Return (Box, ordered_param_ids, param_ranges). If delta_action_scale is set, the action space is [-1, 1]^n and actions are treated as normalised deltas: actual_delta = action * delta_action_scale * (max - min). Otherwise the action space spans [min_val, max_val] per param (absolute values). """ params = [] lows, highs, ranges = [], [], [] for param_id, param in nucon.get_all_writable().items(): if not param.is_readable or param.is_cheat: continue if obs_param_set is not None and param_id not in obs_param_set: continue if param.min_val is None or param.max_val is None: continue # SAC requires finite action bounds sp = _build_param_space(param) if sp is None: continue params.append(param_id) lows.append(sp.low[0]) highs.append(sp.high[0]) ranges.append(sp.high[0] - sp.low[0]) if delta_action_scale is not None: n = len(params) box = spaces.Box(low=-np.ones(n, dtype=np.float32), high=np.ones(n, dtype=np.float32), dtype=np.float32) else: box = spaces.Box(low=np.array(lows, dtype=np.float32), high=np.array(highs, dtype=np.float32), dtype=np.float32) return box, params, np.array(lows, dtype=np.float32), np.array(ranges, dtype=np.float32) def _unflatten_action(flat_action, param_ids): return {pid: float(flat_action[i]) for i, pid in enumerate(param_ids)} def _build_param_space(param): """Return a gymnasium Box for a single NuconParameter, or None if unsupported.""" if param.param_type in (float, int): lo = param.min_val if param.min_val is not None else -np.inf hi = param.max_val if param.max_val is not None else np.inf return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32) elif param.param_type == bool: return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) elif param.param_type == str: return None elif issubclass(param.param_type, Enum): return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32) return None def _apply_action(nucon, action): for param_id, value in action.items(): param = nucon._parameters[param_id] v = float(np.asarray(value).flat[0]) if param.param_type == bool: value = v >= 0.5 # [0,1] space: above midpoint → True elif issubclass(param.param_type, Enum): value = param.param_type(int(v)) else: value = param.param_type(v) if param.min_val is not None and param.max_val is not None: value = param.param_type(np.clip(value, param.min_val, param.max_val)) nucon.set(param, value) # --------------------------------------------------------------------------- # NuconEnv # --------------------------------------------------------------------------- class NuconEnv(gym.Env): metadata = {'render_modes': ['human']} def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0): super().__init__() self.render_mode = render_mode self.seconds_per_step = seconds_per_step if objective_weights is None: objective_weights = [1.0 for _ in objectives] self.objective_weights = objective_weights self.terminate_above = terminate_above self.simulator = simulator if nucon is None: nucon = Nucon(port=simulator.port) if simulator else Nucon() self.nucon = nucon # Observation space — SIM_UNCERTAINTY included when a simulator is present obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} if simulator is not None: obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) for param_id, param in self.nucon.get_all_readable().items(): sp = _build_param_space(param) if sp is not None: obs_spaces[param_id] = sp self.observation_space = spaces.Dict(obs_spaces) self.action_space, self._action_params, self._action_lows, self._action_ranges = \ _build_flat_action_space(self.nucon) self.objectives = [] self.terminators = [] for objective in objectives: if objective in Objectives: self.objectives.append(Objectives[objective]) elif callable(objective): self.objectives.append(objective) else: raise ValueError(f"Unsupported objective: {objective}") for terminator in terminators: if terminator in Objectives: self.terminators.append(Objectives[terminator]) elif callable(terminator): self.terminators.append(terminator) else: raise ValueError(f"Unsupported terminator: {terminator}") def _get_obs(self, sim_uncertainty=None): obs = {} for param_id, param in self.nucon.get_all_readable().items(): if param.param_type == str or param_id not in self.observation_space.spaces: continue value = self.nucon.get(param_id) if isinstance(value, Enum): value = value.value obs[param_id] = value obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step if 'SIM_UNCERTAINTY' in self.observation_space.spaces: obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 return obs def _get_info(self, obs): info = {'objectives': {}, 'objectives_weighted': {}} for objective, weight in zip(self.objectives, self.objective_weights): obj = objective(obs) name = getattr(objective, '__name__', repr(objective)) info['objectives'][name] = obj info['objectives_weighted'][name] = obj * weight return info def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 observation = self._get_obs() return observation, self._get_info(observation) def step(self, action): _apply_action(self.nucon, _unflatten_action(action, self._action_params)) # Advance sim (or sleep) — get uncertainty for obs injection truncated = False uncertainty = None if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) self._total_steps += 1 observation = self._get_obs(sim_uncertainty=uncertainty) info = self._get_info(observation) reward = sum(obj for obj in info['objectives_weighted'].values()) terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above return observation, reward, terminated, truncated, info def render(self): pass def close(self): pass def _flatten_observation(self, observation): return np.concatenate([np.asarray(v).flatten() for v in observation.values()]) # --------------------------------------------------------------------------- # NuconGoalEnv # --------------------------------------------------------------------------- class NuconGoalEnv(gym.Env): """ Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay). Observation is a Dict with three keys: - 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active) - 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range - 'desired_goal': target values sampled each episode, normalised to [0, 1] ``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly. reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping. Usage with SB3 HER:: from stable_baselines3 import SAC from stable_baselines3.common.buffers import HerReplayBuffer from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort env = NuconGoalEnv( goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)}, tolerance=0.05, simulator=simulator, # uncertainty-aware reward: penalise OOD, abort if too far out reward_fn=lambda ag, dg, obs: ( -(np.linalg.norm(ag - dg) ** 2) - 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2 ), terminators=[UncertaintyAbort(threshold=0.7)], ) model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer) model.learn(total_timesteps=500_000) """ metadata = {'render_modes': ['human']} def __init__( self, goal_params, goal_range=None, reward_fn=None, tolerance=None, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, terminators=None, terminate_above=0, additional_objectives=None, additional_objective_weights=None, obs_params=None, init_states=None, delta_action_scale=None, ): super().__init__() self.render_mode = render_mode self.seconds_per_step = seconds_per_step self._delta_action_scale = delta_action_scale self.terminate_above = terminate_above self.simulator = simulator self.goal_params = list(goal_params) self.tolerance = tolerance if nucon is None: nucon = Nucon(port=simulator.port) if simulator else Nucon() self.nucon = nucon all_readable = self.nucon.get_all_readable() for pid in self.goal_params: if pid not in all_readable: raise ValueError(f"Goal param '{pid}' is not a readable parameter") goal_range = goal_range or {} self._goal_low = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0] for pid in self.goal_params ], dtype=np.float32) self._goal_high = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1] for pid in self.goal_params ], dtype=np.float32) self._goal_range = self._goal_high - self._goal_low self._goal_range[self._goal_range == 0] = 1.0 # Detect reward_fn arity for backward compat (2-arg vs 3-arg) self._reward_fn = reward_fn if reward_fn is not None: n_args = len(inspect.signature(reward_fn).parameters) self._reward_fn_wants_obs = n_args >= 3 else: self._reward_fn_wants_obs = False # Observation params: model.input_params defines the canonical list — the same set is # used whether training in sim or deploying to the real game (the game simply has more # params available; we query only the subset we care about). # Explicit obs_params overrides everything (use when deploying to real game without sim). # SB3 HER requires observation to be a flat Box, not a nested Dict. goal_set = set(self.goal_params) self._obs_with_uncertainty = simulator is not None if obs_params is not None: base_params = [p for p in obs_params if p not in goal_set] elif simulator is not None and hasattr(simulator, 'model') and simulator.model is not None: base_params = [p for p in simulator.model.input_params if p not in goal_set and p in all_readable and _build_param_space(all_readable[p]) is not None] else: base_params = [p for p, param in all_readable.items() if p not in goal_set and _build_param_space(param) is not None] # SIM_UNCERTAINTY is not in _obs_params — it's not available at deployment on the real game self._obs_params = base_params n_goals = len(self.goal_params) self.observation_space = spaces.Dict({ 'observation': spaces.Box(low=-np.inf, high=np.inf, shape=(len(self._obs_params),), dtype=np.float32), 'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), 'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), }) # Action space: writable params within the obs param set (flat Box for SB3 compatibility). self.action_space, self._action_params, self._action_lows, self._action_ranges = \ _build_flat_action_space(self.nucon, set(base_params), delta_action_scale) self._terminators = terminators or [] _objs = additional_objectives or [] self._objectives = [Objectives[o] if isinstance(o, str) else o for o in _objs] self._objective_weights = additional_objective_weights or [1.0] * len(self._objectives) self._init_states = init_states # list of state dicts to sample on reset self._desired_goal = np.zeros(n_goals, dtype=np.float32) self._total_steps = 0 def compute_reward(self, achieved_goal, desired_goal, info): """Dense negative L2, sparse with tolerance, or custom reward_fn.""" obs_named = info.get('obs_named', {}) if isinstance(info, dict) else {} if self._reward_fn is not None: if self._reward_fn_wants_obs: return self._reward_fn(achieved_goal, desired_goal, obs_named) return self._reward_fn(achieved_goal, desired_goal) dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1) if self.tolerance is not None: return (dist <= self.tolerance).astype(np.float32) - 1.0 return -dist def _read_goal_values(self): raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32) return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0) def _read_obs(self, sim_uncertainty=None): """Return (gym_obs_dict, reward_obs_dict). When a simulator is attached, reads directly from sim.parameters (no HTTP). Otherwise falls back to a single batch HTTP request. """ def _to_float(v): if v is None: return 0.0 return float(v.value if isinstance(v, Enum) else v) if self.simulator is not None: # Direct in-process read — no HTTP overhead def _get(pid): return _to_float(self.simulator.get(pid)) else: raw = self.nucon._batch_query(self._obs_params + self.goal_params) all_params = self.nucon.get_all_readable() def _get(pid): try: v = self.nucon._parse_value(all_params[pid], raw.get(pid, '0')) return _to_float(v) except Exception: return 0.0 reward_obs = {} if self._obs_with_uncertainty: reward_obs['SIM_UNCERTAINTY'] = float(sim_uncertainty) if sim_uncertainty is not None else 0.0 for pid in self._obs_params: reward_obs[pid] = _get(pid) obs_vec = np.array([reward_obs[p] for p in self._obs_params], dtype=np.float32) goal_raw = np.array([_get(p) for p in self.goal_params], dtype=np.float32) achieved = np.clip((goal_raw - self._goal_low) / self._goal_range, 0.0, 1.0) gym_obs = {'observation': obs_vec, 'achieved_goal': achieved, 'desired_goal': self._desired_goal.copy()} return gym_obs, reward_obs def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 rng = np.random.default_rng(seed) self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32) if self._init_states is not None and self.simulator is not None: state = self._init_states[rng.integers(len(self._init_states))] for k, v in state.items(): try: self.simulator.set(k, v, force=True) except Exception: pass gym_obs, _ = self._read_obs() return gym_obs, {} def step(self, action): flat = np.asarray(action, dtype=np.float32) if self._delta_action_scale is not None: # Compute absolute values from deltas, reading current state directly if possible absolute = {} for i, pid in enumerate(self._action_params): param = self.nucon._parameters[pid] if param.param_type == bool: absolute[pid] = 1.0 if flat[i] > 0 else 0.0 else: if self.simulator is not None: v = self.simulator.get(pid) current = float(v.value if isinstance(v, Enum) else v) if v is not None else 0.0 else: current = 0.0 # fallback; batch read not worth it for actions alone delta = float(flat[i]) * self._delta_action_scale * self._action_ranges[i] absolute[pid] = float(np.clip(current + delta, self._action_lows[i], self._action_lows[i] + self._action_ranges[i])) else: absolute = _unflatten_action(flat, self._action_params) if self.simulator is not None: # Write directly to sim — skip HTTP entirely for pid, val in absolute.items(): try: self.simulator.set(pid, val, force=True) except Exception: pass else: _apply_action(self.nucon, absolute) if self.simulator: uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) uncertainty = None self._total_steps += 1 gym_obs, reward_obs = self._read_obs(sim_uncertainty=uncertainty) info = {'achieved_goal': gym_obs['achieved_goal'], 'desired_goal': gym_obs['desired_goal'], 'obs_named': reward_obs} reward = float(self.compute_reward(gym_obs['achieved_goal'], gym_obs['desired_goal'], info)) reward += sum(w * o(reward_obs) for o, w in zip(self._objectives, self._objective_weights)) terminated = any(t(reward_obs) > self.terminate_above for t in self._terminators) return gym_obs, reward, terminated, False, info def render(self): pass def close(self): pass # --------------------------------------------------------------------------- # Registration # --------------------------------------------------------------------------- def register_nucon_envs(): gym.register( id='Nucon-max_power-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': ['max_power']} ) gym.register( id='Nucon-target_temperature_350-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]} ) gym.register( id='Nucon-safe_max_power-v0', entry_point='nucon.rl:NuconEnv', kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]} ) gym.register( id='Nucon-goal_power-v0', entry_point='nucon.rl:NuconGoalEnv', kwargs={ 'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], 'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)}, 'seconds_per_step': 5, } ) gym.register( id='Nucon-goal_temp-v0', entry_point='nucon.rl:NuconGoalEnv', kwargs={ 'goal_params': ['CORE_TEMP'], 'goal_range': {'CORE_TEMP': (280.0, 380.0)}, 'seconds_per_step': 5, } ) register_nucon_envs()