feat: NuconGoalEnv, composable uncertainty helpers, kNN-GP naming

- Add NuconGoalEnv for goal-conditioned HER training (SAC + HER)
- Add UncertaintyPenalty and UncertaintyAbort composable callables;
  SIM_UNCERTAINTY injected into obs dict when simulator is active
- Fix rl.py: str-typed params crash, missing Enum import, write-only
  params in action space, broken step() iteration order
- Remove uncertainty state from sim (return value from update() instead)
- Rename kNN -> kNN-GP throughout README; add model selection note

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dominik Moritz Roth 2026-03-12 18:51:13 +01:00
parent 1b93699501
commit f4d45d3cfd
2 changed files with 255 additions and 209 deletions

View File

@ -300,8 +300,8 @@ To address the challenge of unknown game dynamics, NuCon provides tools for coll
- **Data Collection**: Gathers state transitions from human play or automated agents. `time_delta` is specified in game-time seconds; wall-clock sleep is automatically adjusted for `GAME_SIM_SPEED` so collected deltas are uniform regardless of simulation speed.
- **Automatic param filtering**: Junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and params from uninstalled subsystems (returns `None`) are automatically excluded from model inputs/outputs.
- **Two model backends**: Neural network (NN) or k-Nearest Neighbours with GP interpolation (kNN).
- **Uncertainty estimation**: The kNN backend returns a GP posterior standard deviation alongside each prediction; 0 means the query lies on known data, ~1 means it is out of distribution.
- **Two model backends**: Neural network (NN) or a local Gaussian Process approximated via k-Nearest Neighbours (kNN-GP).
- **Uncertainty estimation**: The kNN-GP backend returns a GP posterior standard deviation alongside each prediction; 0 means the query lies on known data, ~1 means it is out of distribution.
- **Dataset management**: Tools for saving, loading, merging, and pruning datasets.
### Additional Dependencies
@ -310,12 +310,16 @@ To address the challenge of unknown game dynamics, NuCon provides tools for coll
pip install -e '.[model]'
```
### Model selection
**kNN-GP** (the `ReactorKNNModel` backend) is a local Gaussian Process: it finds the `k` nearest neighbours in the training set, fits an RBF kernel on them, and returns a prediction plus a GP posterior std as uncertainty. It works well from a few hundred samples and requires no training. **NN** needs input normalisation and several thousand samples to generalise; use it once you have a large dataset. For initial experiments, start with kNN-GP (`k=10`).
### Usage
```python
from nucon.model import NuconModelLearner
# --- Data collection (model_type not needed here) ---
# --- Data collection ---
learner = NuconModelLearner(
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
include_valve_states=False, # set True to include all 53 valve positions as model inputs
@ -333,13 +337,13 @@ nn_learner.train_model(batch_size=32, num_epochs=50) # creates NN model on firs
nn_learner.drop_well_fitted(error_threshold=1.0)
nn_learner.save_model('reactor_nn.pth')
# --- kNN + GP backend ---
# --- kNN-GP backend ---
knn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
# Drop near-duplicate samples before fitting (keeps diverse coverage).
# A sample is dropped only if BOTH its input state AND output transition
# are within the given distances of an already-kept sample.
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
knn_learner.fit_knn(k=10) # creates kNN model on first call
knn_learner.fit_knn(k=10) # creates kNN-GP model on first call
# Point prediction
state = knn_learner._get_state()
@ -399,7 +403,7 @@ The recommended end-to-end workflow for training an RL operator is an iterative
**Step 1 — Human dataset collection**: Run `NuconModelLearner.collect_data()` during your play session. Cover a wide range of states: startup from cold, ramping power, individual rod bank adjustments. Diversity in the dataset directly determines simulator accuracy. See [Model Learning](#model-learning-work-in-progress) for collection details.
**Step 2 — Initial model fitting**: Fit a kNN model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning-work-in-progress).
**Step 2 — Initial model fitting**: Fit a kNN-GP model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning).
**Step 3 — Train RL in simulator**: Load the fitted model into `NuconSimulator`, then train a `NuconGoalEnv` policy with SAC + HER. The simulator runs far faster than the real game, allowing many trajectories in reasonable time. Use `uncertainty_penalty_start` and `uncertainty_abort` on the env to discourage the policy from wandering into regions the model hasn't seen: a linear penalty kicks in above the soft threshold, and the episode is truncated at the hard threshold. This keeps training within the reliable part of the model's knowledge. See [NuconGoalEnv + HER Usage](#nucongoalenv--her-usage).
@ -407,7 +411,7 @@ The recommended end-to-end workflow for training an RL operator is an iterative
**Step 5 — Refit model on expanded data**: Merge new data into the original dataset with `merge_datasets()`, prune with `drop_redundant()`, and refit. Then return to Step 3 with the improved model. Each iteration the simulator gets more accurate and the policy improves.
Stop when the policy performs well in the real game and kNN uncertainty stays low throughout an episode, indicating the policy stays within the known data distribution.
Stop when the policy performs well in the real game and kNN-GP uncertainty stays low throughout an episode, indicating the policy stays within the known data distribution.
## Testing

View File

@ -1,167 +1,83 @@
import inspect
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import time
from typing import Dict, Any
from typing import Dict, Any, Callable, List, Optional
from enum import Enum
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
# ---------------------------------------------------------------------------
# Reward / objective helpers
# ---------------------------------------------------------------------------
Objectives = {
"null": lambda obs: 0,
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
"null": lambda obs: 0,
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
"episode_time": lambda obs: obs["EPISODE_TIME"],
}
Parameterized_Objectives = {
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
}
class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0,
uncertainty_penalty_start: float = None, uncertainty_abort: float = None, uncertainty_penalty_scale: float = 1.0):
super().__init__()
def UncertaintyPenalty(start: float = 0.3, scale: float = 1.0, mode: str = 'l2') -> Callable:
"""Objective that penalises high simulator uncertainty.
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
if objective_weights is None:
objective_weights = [1.0 for objective in objectives]
self.objective_weights = objective_weights
self.terminate_above = terminate_above
self.simulator = simulator
self.uncertainty_penalty_start = uncertainty_penalty_start
self.uncertainty_abort = uncertainty_abort
self.uncertainty_penalty_scale = uncertainty_penalty_scale
Returns a callable ``(obs) -> float`` suitable for use as an objective or
terminator in NuconEnv / NuconGoalEnv. Works because ``SIM_UNCERTAINTY``
is injected into the obs dict whenever a simulator is active.
if nucon is None:
if simulator:
nucon = Nucon(port=simulator.port)
else:
nucon = Nucon()
self.nucon = nucon
Args:
start: uncertainty level at which the penalty starts (default 0.3).
scale: penalty coefficient.
mode: ``'l2'`` (quadratic, default) or ``'linear'``.
# Define observation space
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param_id, param in self.nucon.get_all_readable().items():
sp = _build_param_space(param)
if sp is not None:
obs_spaces[param_id] = sp
self.observation_space = spaces.Dict(obs_spaces)
Example::
# Define action space (only controllable, non-cheat, readable-back params)
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded
sp = _build_param_space(param)
if sp is not None:
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
env = NuconEnv(
objectives=['max_power', UncertaintyPenalty(start=0.3, scale=2.0)],
objective_weights=[1.0, 1.0],
simulator=simulator,
)
"""
excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start)
if mode == 'l2':
return lambda obs: -scale * excess(obs) ** 2
elif mode == 'linear':
return lambda obs: -scale * excess(obs)
else:
raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.")
self.objectives = []
self.terminators = []
for objective in objectives:
if objective in Objectives:
self.objectives.append(Objectives[objective])
elif callable(objective):
self.objectives.append(objective)
else:
raise ValueError(f"Unsupported objective: {objective}")
def UncertaintyAbort(threshold: float = 0.7) -> Callable:
"""Terminator that aborts the episode when simulator uncertainty is too high.
for terminator in terminators:
if terminator in Objectives:
self.terminators.append(Objectives[terminator])
elif callable(terminator):
self.terminators.append(terminator)
else:
raise ValueError(f"Unsupported terminator: {terminator}")
Returns a callable ``(obs) -> float`` for use as a *terminator*. When
the GP posterior std exceeds ``threshold`` the episode is truncated
(``terminated=True``).
def _get_obs(self):
obs = {}
for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == str or param_id not in self.observation_space.spaces:
continue
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step
return obs
Example::
def _get_info(self):
info = {'objectives': {}, 'objectives_weighted': {}}
for objective, weight in zip(self.objectives, self.objective_weights):
obj = objective(self._get_obs())
info['objectives'][objective.__name__] = obj
info['objectives_weighted'][objective.__name__] = obj * weight
return info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
env = NuconEnv(
objectives=['max_power'],
terminators=[UncertaintyAbort(threshold=0.7)],
terminate_above=0,
simulator=simulator,
)
"""
return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0
self._total_steps = 0
observation = self._get_obs()
info = self._get_info()
return observation, info
def step(self, action):
# Apply the action to the Nucon system
for param_id, value in action.items():
param = self.nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value)
observation = self._get_obs()
terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above
truncated = False
info = self._get_info()
reward = sum(obj for obj in info['objectives_weighted'].values())
self._total_steps += 1
if self.simulator:
needs_uncertainty = self.uncertainty_penalty_start is not None or self.uncertainty_abort is not None
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=needs_uncertainty)
if uncertainty is not None:
if self.uncertainty_abort is not None and uncertainty >= self.uncertainty_abort:
truncated = True
if self.uncertainty_penalty_start is not None and uncertainty > self.uncertainty_penalty_start:
reward -= self.uncertainty_penalty_scale * (uncertainty - self.uncertainty_penalty_start)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
return observation, reward, terminated, truncated, info
def render(self):
if self.render_mode == "human":
pass
def close(self):
pass
def _flatten_action(self, action):
return np.concatenate([v.flatten() for v in action.values()])
def _unflatten_action(self, flat_action):
return {k: v.reshape(1, -1) for k, v in self.action_space.items()}
def _flatten_observation(self, observation):
return np.concatenate([v.flatten() for v in observation.values()])
def _unflatten_observation(self, flat_observation):
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _build_param_space(param):
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
@ -180,30 +96,174 @@ def _build_param_space(param):
return None
def _apply_action(nucon, action):
for param_id, value in action.items():
param = nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
nucon.set(param, value)
# ---------------------------------------------------------------------------
# NuconEnv
# ---------------------------------------------------------------------------
class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5,
objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__()
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
if objective_weights is None:
objective_weights = [1.0 for _ in objectives]
self.objective_weights = objective_weights
self.terminate_above = terminate_above
self.simulator = simulator
if nucon is None:
nucon = Nucon(port=simulator.port) if simulator else Nucon()
self.nucon = nucon
# Observation space — SIM_UNCERTAINTY included when a simulator is present
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
if simulator is not None:
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
for param_id, param in self.nucon.get_all_readable().items():
sp = _build_param_space(param)
if sp is not None:
obs_spaces[param_id] = sp
self.observation_space = spaces.Dict(obs_spaces)
# Action space
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
continue
sp = _build_param_space(param)
if sp is not None:
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
self.objectives = []
self.terminators = []
for objective in objectives:
if objective in Objectives:
self.objectives.append(Objectives[objective])
elif callable(objective):
self.objectives.append(objective)
else:
raise ValueError(f"Unsupported objective: {objective}")
for terminator in terminators:
if terminator in Objectives:
self.terminators.append(Objectives[terminator])
elif callable(terminator):
self.terminators.append(terminator)
else:
raise ValueError(f"Unsupported terminator: {terminator}")
def _get_obs(self, sim_uncertainty=None):
obs = {}
for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == str or param_id not in self.observation_space.spaces:
continue
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step
if 'SIM_UNCERTAINTY' in self.observation_space.spaces:
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
return obs
def _get_info(self, obs):
info = {'objectives': {}, 'objectives_weighted': {}}
for objective, weight in zip(self.objectives, self.objective_weights):
obj = objective(obs)
name = getattr(objective, '__name__', repr(objective))
info['objectives'][name] = obj
info['objectives_weighted'][name] = obj * weight
return info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
observation = self._get_obs()
return observation, self._get_info(observation)
def step(self, action):
_apply_action(self.nucon, action)
# Advance sim (or sleep) — get uncertainty for obs injection
truncated = False
uncertainty = None
if self.simulator:
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
self._total_steps += 1
observation = self._get_obs(sim_uncertainty=uncertainty)
info = self._get_info(observation)
reward = sum(obj for obj in info['objectives_weighted'].values())
terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above
return observation, reward, terminated, truncated, info
def render(self):
pass
def close(self):
pass
def _flatten_observation(self, observation):
return np.concatenate([np.asarray(v).flatten() for v in observation.values()])
# ---------------------------------------------------------------------------
# NuconGoalEnv
# ---------------------------------------------------------------------------
class NuconGoalEnv(gym.Env):
"""
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
The observation is a Dict with three keys as required by GoalEnv / HER:
- 'observation': all readable non-goal, non-str params (same encoding as NuconEnv)
Observation is a Dict with three keys:
- 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active)
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
Reward defaults to negative L2 distance in the normalised goal space (dense).
Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance).
``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly.
reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` the 3-arg form
receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping.
Usage with SB3 HER::
from stable_baselines3 import SAC
from stable_baselines3.common.buffers import HerReplayBuffer
from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort
env = NuconGoalEnv(
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
tolerance=0.05,
simulator=simulator,
# uncertainty-aware reward: penalise OOD, abort if too far out
reward_fn=lambda ag, dg, obs: (
-(np.linalg.norm(ag - dg) ** 2)
- 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2
),
terminators=[UncertaintyAbort(threshold=0.7)],
)
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
model.learn(total_timesteps=200_000)
model.learn(total_timesteps=500_000)
"""
metadata = {'render_modes': ['human']}
@ -220,9 +280,6 @@ class NuconGoalEnv(gym.Env):
seconds_per_step=5,
terminators=None,
terminate_above=0,
uncertainty_penalty_start: float = None,
uncertainty_abort: float = None,
uncertainty_penalty_scale: float = 1.0,
):
super().__init__()
@ -238,14 +295,12 @@ class NuconGoalEnv(gym.Env):
self.nucon = nucon
all_readable = self.nucon.get_all_readable()
# Validate goal params and build per-param range arrays
for pid in self.goal_params:
if pid not in all_readable:
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
goal_range = goal_range or {}
self._goal_low = np.array([
self._goal_low = np.array([
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
for pid in self.goal_params
], dtype=np.float32)
@ -254,13 +309,21 @@ class NuconGoalEnv(gym.Env):
for pid in self.goal_params
], dtype=np.float32)
self._goal_range = self._goal_high - self._goal_low
self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero
self._goal_range[self._goal_range == 0] = 1.0
self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None
# Detect reward_fn arity for backward compat (2-arg vs 3-arg)
self._reward_fn = reward_fn
if reward_fn is not None:
n_args = len(inspect.signature(reward_fn).parameters)
self._reward_fn_wants_obs = n_args >= 3
else:
self._reward_fn_wants_obs = False
# Observation subspace: all readable non-str non-goal params
# Observation subspace
goal_set = set(self.goal_params)
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
if simulator is not None:
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
for param_id, param in all_readable.items():
if param_id in goal_set:
continue
@ -275,7 +338,7 @@ class NuconGoalEnv(gym.Env):
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
})
# Action space: readable-back, non-cheat writable params
# Action space
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
@ -285,26 +348,16 @@ class NuconGoalEnv(gym.Env):
action_spaces[param_id] = sp
self.action_space = spaces.Dict(action_spaces)
# Terminators
self._terminators = terminators or []
self.uncertainty_penalty_start = uncertainty_penalty_start
self.uncertainty_abort = uncertainty_abort
self.uncertainty_penalty_scale = uncertainty_penalty_scale
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
self._total_steps = 0
# ------------------------------------------------------------------
# GoalEnv interface
# ------------------------------------------------------------------
def compute_reward(self, achieved_goal, desired_goal, info):
"""
Dense: negative L2 in normalised goal space (each dim in [0,1]).
Sparse when tolerance is set: 0 if within tolerance, -1 otherwise.
Custom reward_fn overrides both.
"""
"""Dense negative L2, sparse with tolerance, or custom reward_fn."""
obs = info.get('obs', {}) if isinstance(info, dict) else {}
if self._reward_fn is not None:
if self._reward_fn_wants_obs:
return self._reward_fn(achieved_goal, desired_goal, obs)
return self._reward_fn(achieved_goal, desired_goal)
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
if self.tolerance is not None:
@ -312,13 +365,13 @@ class NuconGoalEnv(gym.Env):
return -dist
def _read_goal_values(self):
raw = np.array([
self.nucon.get(pid) or 0.0 for pid in self.goal_params
], dtype=np.float32)
raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32)
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
def _get_obs_dict(self):
def _get_obs_dict(self, sim_uncertainty=None):
obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)}
if 'SIM_UNCERTAINTY' in self.observation_space['observation'].spaces:
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
goal_set = set(self.goal_params)
for param_id, param in self.nucon.get_all_readable().items():
if param_id in goal_set or param_id not in self.observation_space['observation'].spaces:
@ -337,44 +390,28 @@ class NuconGoalEnv(gym.Env):
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
# Sample a new goal uniformly from the goal range
rng = np.random.default_rng(seed)
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
obs = self._get_obs_dict()
return obs, {}
return self._get_obs_dict(), {}
def step(self, action):
for param_id, value in action.items():
param = self.nucon._parameters[param_id]
if issubclass(param.param_type, Enum):
value = param.param_type(int(np.asarray(value).flat[0]))
else:
value = param.param_type(np.asarray(value).flat[0])
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value)
_apply_action(self.nucon, action)
obs = self._get_obs_dict()
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {}))
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
truncated = False
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']}
self._total_steps += 1
# Advance sim (or sleep)
uncertainty = None
if self.simulator:
needs_uncertainty = self.uncertainty_penalty_start is not None or self.uncertainty_abort is not None
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=needs_uncertainty)
if uncertainty is not None:
if self.uncertainty_abort is not None and uncertainty >= self.uncertainty_abort:
truncated = True
if self.uncertainty_penalty_start is not None and uncertainty > self.uncertainty_penalty_start:
reward -= self.uncertainty_penalty_scale * (uncertainty - self.uncertainty_penalty_start)
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
self._total_steps += 1
obs = self._get_obs_dict(sim_uncertainty=uncertainty)
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal'],
'obs': obs['observation']}
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info))
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
truncated = False
return obs, reward, terminated, truncated, info
def render(self):
@ -384,6 +421,10 @@ class NuconGoalEnv(gym.Env):
pass
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def register_nucon_envs():
gym.register(
id='Nucon-max_power-v0',
@ -398,9 +439,11 @@ def register_nucon_envs():
gym.register(
id='Nucon-safe_max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
kwargs={'seconds_per_step': 5,
'objectives': [Parameterized_Objectives['temp_above'](min_temp=310),
Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'],
'objective_weights': [1, 10, 1/100_000]}
)
# Goal-conditioned: target total generator output (train with HER)
gym.register(
id='Nucon-goal_power-v0',
entry_point='nucon.rl:NuconGoalEnv',
@ -410,7 +453,6 @@ def register_nucon_envs():
'seconds_per_step': 5,
}
)
# Goal-conditioned: target core temperature (train with HER)
gym.register(
id='Nucon-goal_temp-v0',
entry_point='nucon.rl:NuconGoalEnv',
@ -421,4 +463,4 @@ def register_nucon_envs():
}
)
register_nucon_envs()
register_nucon_envs()