From f4d45d3cfd44458d0ccd0688f989f974daa3e1be Mon Sep 17 00:00:00 2001 From: Dominik Roth Date: Thu, 12 Mar 2026 18:51:13 +0100 Subject: [PATCH] feat: NuconGoalEnv, composable uncertainty helpers, kNN-GP naming - Add NuconGoalEnv for goal-conditioned HER training (SAC + HER) - Add UncertaintyPenalty and UncertaintyAbort composable callables; SIM_UNCERTAINTY injected into obs dict when simulator is active - Fix rl.py: str-typed params crash, missing Enum import, write-only params in action space, broken step() iteration order - Remove uncertainty state from sim (return value from update() instead) - Rename kNN -> kNN-GP throughout README; add model selection note Co-Authored-By: Claude Sonnet 4.6 --- README.md | 18 ++- nucon/rl.py | 446 ++++++++++++++++++++++++++++------------------------ 2 files changed, 255 insertions(+), 209 deletions(-) diff --git a/README.md b/README.md index 3151cff..3bece63 100644 --- a/README.md +++ b/README.md @@ -300,8 +300,8 @@ To address the challenge of unknown game dynamics, NuCon provides tools for coll - **Data Collection**: Gathers state transitions from human play or automated agents. `time_delta` is specified in game-time seconds; wall-clock sleep is automatically adjusted for `GAME_SIM_SPEED` so collected deltas are uniform regardless of simulation speed. - **Automatic param filtering**: Junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and params from uninstalled subsystems (returns `None`) are automatically excluded from model inputs/outputs. -- **Two model backends**: Neural network (NN) or k-Nearest Neighbours with GP interpolation (kNN). -- **Uncertainty estimation**: The kNN backend returns a GP posterior standard deviation alongside each prediction; 0 means the query lies on known data, ~1 means it is out of distribution. +- **Two model backends**: Neural network (NN) or a local Gaussian Process approximated via k-Nearest Neighbours (kNN-GP). +- **Uncertainty estimation**: The kNN-GP backend returns a GP posterior standard deviation alongside each prediction; 0 means the query lies on known data, ~1 means it is out of distribution. - **Dataset management**: Tools for saving, loading, merging, and pruning datasets. ### Additional Dependencies @@ -310,12 +310,16 @@ To address the challenge of unknown game dynamics, NuCon provides tools for coll pip install -e '.[model]' ``` +### Model selection + +**kNN-GP** (the `ReactorKNNModel` backend) is a local Gaussian Process: it finds the `k` nearest neighbours in the training set, fits an RBF kernel on them, and returns a prediction plus a GP posterior std as uncertainty. It works well from a few hundred samples and requires no training. **NN** needs input normalisation and several thousand samples to generalise; use it once you have a large dataset. For initial experiments, start with kNN-GP (`k=10`). + ### Usage ```python from nucon.model import NuconModelLearner -# --- Data collection (model_type not needed here) --- +# --- Data collection --- learner = NuconModelLearner( time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed) include_valve_states=False, # set True to include all 53 valve positions as model inputs @@ -333,13 +337,13 @@ nn_learner.train_model(batch_size=32, num_epochs=50) # creates NN model on firs nn_learner.drop_well_fitted(error_threshold=1.0) nn_learner.save_model('reactor_nn.pth') -# --- kNN + GP backend --- +# --- kNN-GP backend --- knn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl') # Drop near-duplicate samples before fitting (keeps diverse coverage). # A sample is dropped only if BOTH its input state AND output transition # are within the given distances of an already-kept sample. knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05) -knn_learner.fit_knn(k=10) # creates kNN model on first call +knn_learner.fit_knn(k=10) # creates kNN-GP model on first call # Point prediction state = knn_learner._get_state() @@ -399,7 +403,7 @@ The recommended end-to-end workflow for training an RL operator is an iterative **Step 1 — Human dataset collection**: Run `NuconModelLearner.collect_data()` during your play session. Cover a wide range of states: startup from cold, ramping power, individual rod bank adjustments. Diversity in the dataset directly determines simulator accuracy. See [Model Learning](#model-learning-work-in-progress) for collection details. -**Step 2 — Initial model fitting**: Fit a kNN model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning-work-in-progress). +**Step 2 — Initial model fitting**: Fit a kNN-GP model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning). **Step 3 — Train RL in simulator**: Load the fitted model into `NuconSimulator`, then train a `NuconGoalEnv` policy with SAC + HER. The simulator runs far faster than the real game, allowing many trajectories in reasonable time. Use `uncertainty_penalty_start` and `uncertainty_abort` on the env to discourage the policy from wandering into regions the model hasn't seen: a linear penalty kicks in above the soft threshold, and the episode is truncated at the hard threshold. This keeps training within the reliable part of the model's knowledge. See [NuconGoalEnv + HER Usage](#nucongoalenv--her-usage). @@ -407,7 +411,7 @@ The recommended end-to-end workflow for training an RL operator is an iterative **Step 5 — Refit model on expanded data**: Merge new data into the original dataset with `merge_datasets()`, prune with `drop_redundant()`, and refit. Then return to Step 3 with the improved model. Each iteration the simulator gets more accurate and the policy improves. -Stop when the policy performs well in the real game and kNN uncertainty stays low throughout an episode, indicating the policy stays within the known data distribution. +Stop when the policy performs well in the real game and kNN-GP uncertainty stays low throughout an episode, indicating the policy stays within the known data distribution. ## Testing diff --git a/nucon/rl.py b/nucon/rl.py index f3115c2..868ccb6 100644 --- a/nucon/rl.py +++ b/nucon/rl.py @@ -1,167 +1,83 @@ +import inspect import gymnasium as gym from gymnasium import spaces import numpy as np import time -from typing import Dict, Any +from typing import Dict, Any, Callable, List, Optional from enum import Enum from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus + +# --------------------------------------------------------------------------- +# Reward / objective helpers +# --------------------------------------------------------------------------- + Objectives = { - "null": lambda obs: 0, - "max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"], + "null": lambda obs: 0, + "max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"], "episode_time": lambda obs: obs["EPISODE_TIME"], } Parameterized_Objectives = { "target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2), - "target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2), - "temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2), - "temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2), - "constant": lambda constant: lambda obs: constant, + "target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2), + "temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2), + "temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2), + "constant": lambda constant: lambda obs: constant, } -class NuconEnv(gym.Env): - metadata = {'render_modes': ['human']} - def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0, - uncertainty_penalty_start: float = None, uncertainty_abort: float = None, uncertainty_penalty_scale: float = 1.0): - super().__init__() +def UncertaintyPenalty(start: float = 0.3, scale: float = 1.0, mode: str = 'l2') -> Callable: + """Objective that penalises high simulator uncertainty. - self.render_mode = render_mode - self.seconds_per_step = seconds_per_step - if objective_weights is None: - objective_weights = [1.0 for objective in objectives] - self.objective_weights = objective_weights - self.terminate_above = terminate_above - self.simulator = simulator - self.uncertainty_penalty_start = uncertainty_penalty_start - self.uncertainty_abort = uncertainty_abort - self.uncertainty_penalty_scale = uncertainty_penalty_scale + Returns a callable ``(obs) -> float`` suitable for use as an objective or + terminator in NuconEnv / NuconGoalEnv. Works because ``SIM_UNCERTAINTY`` + is injected into the obs dict whenever a simulator is active. - if nucon is None: - if simulator: - nucon = Nucon(port=simulator.port) - else: - nucon = Nucon() - self.nucon = nucon + Args: + start: uncertainty level at which the penalty starts (default 0.3). + scale: penalty coefficient. + mode: ``'l2'`` (quadratic, default) or ``'linear'``. - # Define observation space - obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} - for param_id, param in self.nucon.get_all_readable().items(): - sp = _build_param_space(param) - if sp is not None: - obs_spaces[param_id] = sp - self.observation_space = spaces.Dict(obs_spaces) + Example:: - # Define action space (only controllable, non-cheat, readable-back params) - action_spaces = {} - for param_id, param in self.nucon.get_all_writable().items(): - if not param.is_readable or param.is_cheat: - continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded - sp = _build_param_space(param) - if sp is not None: - action_spaces[param_id] = sp - self.action_space = spaces.Dict(action_spaces) + env = NuconEnv( + objectives=['max_power', UncertaintyPenalty(start=0.3, scale=2.0)], + objective_weights=[1.0, 1.0], + simulator=simulator, + ) + """ + excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start) + if mode == 'l2': + return lambda obs: -scale * excess(obs) ** 2 + elif mode == 'linear': + return lambda obs: -scale * excess(obs) + else: + raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.") - self.objectives = [] - self.terminators = [] - for objective in objectives: - if objective in Objectives: - self.objectives.append(Objectives[objective]) - elif callable(objective): - self.objectives.append(objective) - else: - raise ValueError(f"Unsupported objective: {objective}") +def UncertaintyAbort(threshold: float = 0.7) -> Callable: + """Terminator that aborts the episode when simulator uncertainty is too high. - for terminator in terminators: - if terminator in Objectives: - self.terminators.append(Objectives[terminator]) - elif callable(terminator): - self.terminators.append(terminator) - else: - raise ValueError(f"Unsupported terminator: {terminator}") + Returns a callable ``(obs) -> float`` for use as a *terminator*. When + the GP posterior std exceeds ``threshold`` the episode is truncated + (``terminated=True``). - def _get_obs(self): - obs = {} - for param_id, param in self.nucon.get_all_readable().items(): - if param.param_type == str or param_id not in self.observation_space.spaces: - continue - value = self.nucon.get(param_id) - if isinstance(value, Enum): - value = value.value - obs[param_id] = value - obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step - return obs + Example:: - def _get_info(self): - info = {'objectives': {}, 'objectives_weighted': {}} - for objective, weight in zip(self.objectives, self.objective_weights): - obj = objective(self._get_obs()) - info['objectives'][objective.__name__] = obj - info['objectives_weighted'][objective.__name__] = obj * weight - return info - - def reset(self, seed=None, options=None): - super().reset(seed=seed) + env = NuconEnv( + objectives=['max_power'], + terminators=[UncertaintyAbort(threshold=0.7)], + terminate_above=0, + simulator=simulator, + ) + """ + return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0 - self._total_steps = 0 - observation = self._get_obs() - info = self._get_info() - - return observation, info - - def step(self, action): - # Apply the action to the Nucon system - for param_id, value in action.items(): - param = self.nucon._parameters[param_id] - if issubclass(param.param_type, Enum): - value = param.param_type(int(np.asarray(value).flat[0])) - else: - value = param.param_type(np.asarray(value).flat[0]) - if param.min_val is not None and param.max_val is not None: - value = np.clip(value, param.min_val, param.max_val) - self.nucon.set(param, value) - - observation = self._get_obs() - terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above - truncated = False - info = self._get_info() - reward = sum(obj for obj in info['objectives_weighted'].values()) - - self._total_steps += 1 - if self.simulator: - needs_uncertainty = self.uncertainty_penalty_start is not None or self.uncertainty_abort is not None - uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=needs_uncertainty) - if uncertainty is not None: - if self.uncertainty_abort is not None and uncertainty >= self.uncertainty_abort: - truncated = True - if self.uncertainty_penalty_start is not None and uncertainty > self.uncertainty_penalty_start: - reward -= self.uncertainty_penalty_scale * (uncertainty - self.uncertainty_penalty_start) - else: - sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 - time.sleep(self.seconds_per_step / sim_speed) - return observation, reward, terminated, truncated, info - - def render(self): - if self.render_mode == "human": - pass - - def close(self): - pass - - def _flatten_action(self, action): - return np.concatenate([v.flatten() for v in action.values()]) - - def _unflatten_action(self, flat_action): - return {k: v.reshape(1, -1) for k, v in self.action_space.items()} - - def _flatten_observation(self, observation): - return np.concatenate([v.flatten() for v in observation.values()]) - - def _unflatten_observation(self, flat_observation): - return {k: v.reshape(1, -1) for k, v in self.observation_space.items()} +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- def _build_param_space(param): """Return a gymnasium Box for a single NuconParameter, or None if unsupported.""" @@ -180,30 +96,174 @@ def _build_param_space(param): return None +def _apply_action(nucon, action): + for param_id, value in action.items(): + param = nucon._parameters[param_id] + if issubclass(param.param_type, Enum): + value = param.param_type(int(np.asarray(value).flat[0])) + else: + value = param.param_type(np.asarray(value).flat[0]) + if param.min_val is not None and param.max_val is not None: + value = np.clip(value, param.min_val, param.max_val) + nucon.set(param, value) + + +# --------------------------------------------------------------------------- +# NuconEnv +# --------------------------------------------------------------------------- + +class NuconEnv(gym.Env): + metadata = {'render_modes': ['human']} + + def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, + objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0): + super().__init__() + + self.render_mode = render_mode + self.seconds_per_step = seconds_per_step + if objective_weights is None: + objective_weights = [1.0 for _ in objectives] + self.objective_weights = objective_weights + self.terminate_above = terminate_above + self.simulator = simulator + + if nucon is None: + nucon = Nucon(port=simulator.port) if simulator else Nucon() + self.nucon = nucon + + # Observation space — SIM_UNCERTAINTY included when a simulator is present + obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} + if simulator is not None: + obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) + for param_id, param in self.nucon.get_all_readable().items(): + sp = _build_param_space(param) + if sp is not None: + obs_spaces[param_id] = sp + self.observation_space = spaces.Dict(obs_spaces) + + # Action space + action_spaces = {} + for param_id, param in self.nucon.get_all_writable().items(): + if not param.is_readable or param.is_cheat: + continue + sp = _build_param_space(param) + if sp is not None: + action_spaces[param_id] = sp + self.action_space = spaces.Dict(action_spaces) + + self.objectives = [] + self.terminators = [] + for objective in objectives: + if objective in Objectives: + self.objectives.append(Objectives[objective]) + elif callable(objective): + self.objectives.append(objective) + else: + raise ValueError(f"Unsupported objective: {objective}") + for terminator in terminators: + if terminator in Objectives: + self.terminators.append(Objectives[terminator]) + elif callable(terminator): + self.terminators.append(terminator) + else: + raise ValueError(f"Unsupported terminator: {terminator}") + + def _get_obs(self, sim_uncertainty=None): + obs = {} + for param_id, param in self.nucon.get_all_readable().items(): + if param.param_type == str or param_id not in self.observation_space.spaces: + continue + value = self.nucon.get(param_id) + if isinstance(value, Enum): + value = value.value + obs[param_id] = value + obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step + if 'SIM_UNCERTAINTY' in self.observation_space.spaces: + obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 + return obs + + def _get_info(self, obs): + info = {'objectives': {}, 'objectives_weighted': {}} + for objective, weight in zip(self.objectives, self.objective_weights): + obj = objective(obs) + name = getattr(objective, '__name__', repr(objective)) + info['objectives'][name] = obj + info['objectives_weighted'][name] = obj * weight + return info + + def reset(self, seed=None, options=None): + super().reset(seed=seed) + self._total_steps = 0 + observation = self._get_obs() + return observation, self._get_info(observation) + + def step(self, action): + _apply_action(self.nucon, action) + + # Advance sim (or sleep) — get uncertainty for obs injection + truncated = False + uncertainty = None + if self.simulator: + uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) + else: + sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 + time.sleep(self.seconds_per_step / sim_speed) + + self._total_steps += 1 + observation = self._get_obs(sim_uncertainty=uncertainty) + info = self._get_info(observation) + reward = sum(obj for obj in info['objectives_weighted'].values()) + terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above + return observation, reward, terminated, truncated, info + + def render(self): + pass + + def close(self): + pass + + def _flatten_observation(self, observation): + return np.concatenate([np.asarray(v).flatten() for v in observation.values()]) + + +# --------------------------------------------------------------------------- +# NuconGoalEnv +# --------------------------------------------------------------------------- + class NuconGoalEnv(gym.Env): """ Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay). - The observation is a Dict with three keys as required by GoalEnv / HER: - - 'observation': all readable non-goal, non-str params (same encoding as NuconEnv) + Observation is a Dict with three keys: + - 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active) - 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range - 'desired_goal': target values sampled each episode, normalised to [0, 1] - Reward defaults to negative L2 distance in the normalised goal space (dense). - Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance). + ``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly. + + reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form + receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping. Usage with SB3 HER:: from stable_baselines3 import SAC from stable_baselines3.common.buffers import HerReplayBuffer + from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort env = NuconGoalEnv( goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'], goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)}, + tolerance=0.05, simulator=simulator, + # uncertainty-aware reward: penalise OOD, abort if too far out + reward_fn=lambda ag, dg, obs: ( + -(np.linalg.norm(ag - dg) ** 2) + - 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2 + ), + terminators=[UncertaintyAbort(threshold=0.7)], ) model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer) - model.learn(total_timesteps=200_000) + model.learn(total_timesteps=500_000) """ metadata = {'render_modes': ['human']} @@ -220,9 +280,6 @@ class NuconGoalEnv(gym.Env): seconds_per_step=5, terminators=None, terminate_above=0, - uncertainty_penalty_start: float = None, - uncertainty_abort: float = None, - uncertainty_penalty_scale: float = 1.0, ): super().__init__() @@ -238,14 +295,12 @@ class NuconGoalEnv(gym.Env): self.nucon = nucon all_readable = self.nucon.get_all_readable() - - # Validate goal params and build per-param range arrays for pid in self.goal_params: if pid not in all_readable: raise ValueError(f"Goal param '{pid}' is not a readable parameter") goal_range = goal_range or {} - self._goal_low = np.array([ + self._goal_low = np.array([ goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0] for pid in self.goal_params ], dtype=np.float32) @@ -254,13 +309,21 @@ class NuconGoalEnv(gym.Env): for pid in self.goal_params ], dtype=np.float32) self._goal_range = self._goal_high - self._goal_low - self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero + self._goal_range[self._goal_range == 0] = 1.0 - self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None + # Detect reward_fn arity for backward compat (2-arg vs 3-arg) + self._reward_fn = reward_fn + if reward_fn is not None: + n_args = len(inspect.signature(reward_fn).parameters) + self._reward_fn_wants_obs = n_args >= 3 + else: + self._reward_fn_wants_obs = False - # Observation subspace: all readable non-str non-goal params + # Observation subspace goal_set = set(self.goal_params) obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} + if simulator is not None: + obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32) for param_id, param in all_readable.items(): if param_id in goal_set: continue @@ -275,7 +338,7 @@ class NuconGoalEnv(gym.Env): 'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32), }) - # Action space: readable-back, non-cheat writable params + # Action space action_spaces = {} for param_id, param in self.nucon.get_all_writable().items(): if not param.is_readable or param.is_cheat: @@ -285,26 +348,16 @@ class NuconGoalEnv(gym.Env): action_spaces[param_id] = sp self.action_space = spaces.Dict(action_spaces) - # Terminators self._terminators = terminators or [] - self.uncertainty_penalty_start = uncertainty_penalty_start - self.uncertainty_abort = uncertainty_abort - self.uncertainty_penalty_scale = uncertainty_penalty_scale - self._desired_goal = np.zeros(n_goals, dtype=np.float32) self._total_steps = 0 - # ------------------------------------------------------------------ - # GoalEnv interface - # ------------------------------------------------------------------ - def compute_reward(self, achieved_goal, desired_goal, info): - """ - Dense: negative L2 in normalised goal space (each dim in [0,1]). - Sparse when tolerance is set: 0 if within tolerance, -1 otherwise. - Custom reward_fn overrides both. - """ + """Dense negative L2, sparse with tolerance, or custom reward_fn.""" + obs = info.get('obs', {}) if isinstance(info, dict) else {} if self._reward_fn is not None: + if self._reward_fn_wants_obs: + return self._reward_fn(achieved_goal, desired_goal, obs) return self._reward_fn(achieved_goal, desired_goal) dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1) if self.tolerance is not None: @@ -312,13 +365,13 @@ class NuconGoalEnv(gym.Env): return -dist def _read_goal_values(self): - raw = np.array([ - self.nucon.get(pid) or 0.0 for pid in self.goal_params - ], dtype=np.float32) + raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32) return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0) - def _get_obs_dict(self): + def _get_obs_dict(self, sim_uncertainty=None): obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)} + if 'SIM_UNCERTAINTY' in self.observation_space['observation'].spaces: + obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0 goal_set = set(self.goal_params) for param_id, param in self.nucon.get_all_readable().items(): if param_id in goal_set or param_id not in self.observation_space['observation'].spaces: @@ -337,44 +390,28 @@ class NuconGoalEnv(gym.Env): def reset(self, seed=None, options=None): super().reset(seed=seed) self._total_steps = 0 - - # Sample a new goal uniformly from the goal range rng = np.random.default_rng(seed) self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32) - - obs = self._get_obs_dict() - return obs, {} + return self._get_obs_dict(), {} def step(self, action): - for param_id, value in action.items(): - param = self.nucon._parameters[param_id] - if issubclass(param.param_type, Enum): - value = param.param_type(int(np.asarray(value).flat[0])) - else: - value = param.param_type(np.asarray(value).flat[0]) - if param.min_val is not None and param.max_val is not None: - value = np.clip(value, param.min_val, param.max_val) - self.nucon.set(param, value) + _apply_action(self.nucon, action) - obs = self._get_obs_dict() - reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {})) - terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators) - truncated = False - info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']} - - self._total_steps += 1 + # Advance sim (or sleep) + uncertainty = None if self.simulator: - needs_uncertainty = self.uncertainty_penalty_start is not None or self.uncertainty_abort is not None - uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=needs_uncertainty) - if uncertainty is not None: - if self.uncertainty_abort is not None and uncertainty >= self.uncertainty_abort: - truncated = True - if self.uncertainty_penalty_start is not None and uncertainty > self.uncertainty_penalty_start: - reward -= self.uncertainty_penalty_scale * (uncertainty - self.uncertainty_penalty_start) + uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True) else: sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0 time.sleep(self.seconds_per_step / sim_speed) + self._total_steps += 1 + obs = self._get_obs_dict(sim_uncertainty=uncertainty) + info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal'], + 'obs': obs['observation']} + reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info)) + terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators) + truncated = False return obs, reward, terminated, truncated, info def render(self): @@ -384,6 +421,10 @@ class NuconGoalEnv(gym.Env): pass +# --------------------------------------------------------------------------- +# Registration +# --------------------------------------------------------------------------- + def register_nucon_envs(): gym.register( id='Nucon-max_power-v0', @@ -398,9 +439,11 @@ def register_nucon_envs(): gym.register( id='Nucon-safe_max_power-v0', entry_point='nucon.rl:NuconEnv', - kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]} + kwargs={'seconds_per_step': 5, + 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), + Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], + 'objective_weights': [1, 10, 1/100_000]} ) - # Goal-conditioned: target total generator output (train with HER) gym.register( id='Nucon-goal_power-v0', entry_point='nucon.rl:NuconGoalEnv', @@ -410,7 +453,6 @@ def register_nucon_envs(): 'seconds_per_step': 5, } ) - # Goal-conditioned: target core temperature (train with HER) gym.register( id='Nucon-goal_temp-v0', entry_point='nucon.rl:NuconGoalEnv', @@ -421,4 +463,4 @@ def register_nucon_envs(): } ) -register_nucon_envs() \ No newline at end of file +register_nucon_envs()