Compare commits
No commits in common. "f4d45d3cfd44458d0ccd0688f989f974daa3e1be" and "3eb0cc7b6085d5751c73c40b3ae75148f4dcf70b" have entirely different histories.
f4d45d3cfd
...
3eb0cc7b60
154
README.md
154
README.md
@ -111,9 +111,9 @@ Custom Enum Types:
|
|||||||
|
|
||||||
So if you're not in the mood to play the game manually, this API can be used to easily create your own automations and control systems. Maybe a little PID controller for the rods? Or, if you wanna go crazy, why not try some
|
So if you're not in the mood to play the game manually, this API can be used to easily create your own automations and control systems. Maybe a little PID controller for the rods? Or, if you wanna go crazy, why not try some
|
||||||
|
|
||||||
## Reinforcement Learning
|
## Reinforcement Learning (Work in Progress)
|
||||||
|
|
||||||
NuCon includes a Reinforcement Learning (RL) environment based on the OpenAI Gym interface. This allows you to train control policies for the Nucleares game instead of writing them yourself. Requires additional dependencies.
|
NuCon includes a preliminary Reinforcement Learning (RL) environment based on the OpenAI Gym interface. This allows you to train control policies for the Nucleares game instead of writing them yourself. This feature is currently a work in progress and requires additional dependencies.
|
||||||
|
|
||||||
### Additional Dependencies
|
### Additional Dependencies
|
||||||
|
|
||||||
@ -127,17 +127,17 @@ pip install -e '.[rl]'
|
|||||||
|
|
||||||
Two environment classes are provided in `nucon/rl.py`:
|
Two environment classes are provided in `nucon/rl.py`:
|
||||||
|
|
||||||
**`NuconEnv`**: classic fixed-objective environment. You define one or more objectives at construction time (e.g. maximise power output, keep temperature in range). The agent always trains toward the same goal.
|
**`NuconEnv`** — classic fixed-objective environment. You define one or more objectives at construction time (e.g. maximise power output, keep temperature in range). The agent always trains toward the same goal.
|
||||||
|
|
||||||
- Observation space: all readable numeric parameters (~290 dims).
|
- Observation space: all readable numeric parameters (~290 dims).
|
||||||
- Action space: all readable-back writable parameters (~30 dims): 9 individual rod bank positions, 3 MSCVs, 3 turbine bypass valves, 6 coolant pump speeds, condenser pump, freight/vent switches, resistor banks, and more.
|
- Action space: all readable-back writable parameters (~30 dims): 9 individual rod bank positions, 3 MSCVs, 3 turbine bypass valves, 6 coolant pump speeds, condenser pump, freight/vent switches, resistor banks, and more.
|
||||||
- Objectives: predefined strings (`'max_power'`, `'episode_time'`) or arbitrary callables `(obs) -> float`. Multiple objectives are weighted-summed.
|
- Objectives: predefined strings (`'max_power'`, `'episode_time'`) or arbitrary callables `(obs) -> float`. Multiple objectives are weighted-summed.
|
||||||
|
|
||||||
**`NuconGoalEnv`**: goal-conditioned environment. The desired goal (e.g. target generator output) is sampled at the start of each episode and provided as part of the observation. A single policy learns to reach *any* goal in the specified range, making it far more useful than a fixed-objective agent. Designed for training with [Hindsight Experience Replay (HER)](https://arxiv.org/abs/1707.01495), which makes sparse-reward goal-conditioned training tractable.
|
**`NuconGoalEnv`** — goal-conditioned environment. The desired goal (e.g. target generator output) is sampled at the start of each episode and provided as part of the observation. A single policy learns to reach *any* goal in the specified range, making it far more useful than a fixed-objective agent. Designed for training with [Hindsight Experience Replay (HER)](https://arxiv.org/abs/1707.01495), which makes sparse-reward goal-conditioned training tractable.
|
||||||
|
|
||||||
- Observation space: `Dict` with keys `observation` (non-goal params), `achieved_goal` (current goal param values, normalised to [0,1]), `desired_goal` (target, normalised to [0,1]).
|
- Observation space: `Dict` with keys `observation` (non-goal params), `achieved_goal` (current goal param values, normalised to [0,1]), `desired_goal` (target, normalised to [0,1]).
|
||||||
- Goals are sampled uniformly from the specified `goal_range` each episode.
|
- Goals are sampled uniformly from the specified `goal_range` each episode.
|
||||||
- Reward defaults to negative L2 distance in normalised goal space (dense). Pass `tolerance` for a sparse `{0, -1}` reward; this works particularly well with HER.
|
- Reward defaults to negative L2 distance in normalised goal space (dense). Pass `tolerance` for a sparse `{0, -1}` reward — this works particularly well with HER.
|
||||||
|
|
||||||
### NuconEnv Usage
|
### NuconEnv Usage
|
||||||
|
|
||||||
@ -193,7 +193,7 @@ env.close()
|
|||||||
|
|
||||||
### NuconGoalEnv + HER Usage
|
### NuconGoalEnv + HER Usage
|
||||||
|
|
||||||
HER works by relabelling past trajectories with the goal that was *actually achieved*, turning every episode into useful training signal even when the agent never reaches the intended target. This makes it much more sample-efficient than standard RL for goal-reaching tasks. This matters a lot given how slow the real game is.
|
HER works by relabelling past trajectories with the goal that was *actually achieved*, turning every episode into useful training signal even when the agent never reaches the intended target. This makes it much more sample-efficient than standard RL for goal-reaching tasks — important given how slow the real game is.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from nucon.rl import NuconGoalEnv
|
from nucon.rl import NuconGoalEnv
|
||||||
@ -242,9 +242,9 @@ Predefined goal environments:
|
|||||||
- `Nucon-goal_power-v0`: target total generator output (3 × 0–1200 kW)
|
- `Nucon-goal_power-v0`: target total generator output (3 × 0–1200 kW)
|
||||||
- `Nucon-goal_temp-v0`: target core temperature (280–380 °C)
|
- `Nucon-goal_temp-v0`: target core temperature (280–380 °C)
|
||||||
|
|
||||||
RL algorithms require a huge number of training steps, and Nucleares is slow and cannot be trivially parallelised. That's why NuCon provides a built-in simulator.
|
But theres a problem: RL algorithms require a huge amount of training steps to get passable policies, and Nucleares is a very slow simulation and can not be trivially parallelized. That's why NuCon also provides a
|
||||||
|
|
||||||
## Simulator
|
## Simulator (Work in Progress)
|
||||||
|
|
||||||
NuCon provides a built-in simulator to address the challenge of slow training times in the actual Nucleares game. This simulator allows for rapid prototyping and testing of control policies without the need for the full game environment. Key features include:
|
NuCon provides a built-in simulator to address the challenge of slow training times in the actual Nucleares game. This simulator allows for rapid prototyping and testing of control policies without the need for the full game environment. Key features include:
|
||||||
|
|
||||||
@ -292,16 +292,16 @@ env = NuconEnv(simulator=simulator) # When given a similator, instead of waiting
|
|||||||
# ...
|
# ...
|
||||||
```
|
```
|
||||||
|
|
||||||
The simulator needs an accurate dynamics model of the game. NuCon provides tools to learn one from real gameplay data.
|
But theres yet another problem: We do not know the exact simulation dynamics of the game and can therefore not implement an accurate simulator. Thats why NuCon also provides
|
||||||
|
|
||||||
## Model Learning
|
## Model Learning (Work in Progress)
|
||||||
|
|
||||||
To address the challenge of unknown game dynamics, NuCon provides tools for collecting data, creating datasets, and training models to learn the reactor dynamics. Key features include:
|
To address the challenge of unknown game dynamics, NuCon provides tools for collecting data, creating datasets, and training models to learn the reactor dynamics. Key features include:
|
||||||
|
|
||||||
- **Data Collection**: Gathers state transitions from human play or automated agents. `time_delta` is specified in game-time seconds; wall-clock sleep is automatically adjusted for `GAME_SIM_SPEED` so collected deltas are uniform regardless of simulation speed.
|
- **Data Collection**: Gathers state transitions from human play or automated agents. `time_delta` is specified in game-time seconds; wall-clock sleep is automatically adjusted for `GAME_SIM_SPEED` so collected deltas are uniform regardless of simulation speed.
|
||||||
- **Automatic param filtering**: Junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and params from uninstalled subsystems (returns `None`) are automatically excluded from model inputs/outputs.
|
- **Automatic param filtering**: Junk params (GAME_VERSION, TIME, ALARMS_ACTIVE, …) and params from uninstalled subsystems (returns `None`) are automatically excluded from model inputs/outputs.
|
||||||
- **Two model backends**: Neural network (NN) or a local Gaussian Process approximated via k-Nearest Neighbours (kNN-GP).
|
- **Two model backends**: Neural network (NN) or k-Nearest Neighbours with GP interpolation (kNN).
|
||||||
- **Uncertainty estimation**: The kNN-GP backend returns a GP posterior standard deviation alongside each prediction; 0 means the query lies on known data, ~1 means it is out of distribution.
|
- **Uncertainty estimation**: The kNN backend returns a GP posterior standard deviation alongside each prediction — 0 means the query lies on known data, ~1 means it is out of distribution.
|
||||||
- **Dataset management**: Tools for saving, loading, merging, and pruning datasets.
|
- **Dataset management**: Tools for saving, loading, merging, and pruning datasets.
|
||||||
|
|
||||||
### Additional Dependencies
|
### Additional Dependencies
|
||||||
@ -310,16 +310,12 @@ To address the challenge of unknown game dynamics, NuCon provides tools for coll
|
|||||||
pip install -e '.[model]'
|
pip install -e '.[model]'
|
||||||
```
|
```
|
||||||
|
|
||||||
### Model selection
|
|
||||||
|
|
||||||
**kNN-GP** (the `ReactorKNNModel` backend) is a local Gaussian Process: it finds the `k` nearest neighbours in the training set, fits an RBF kernel on them, and returns a prediction plus a GP posterior std as uncertainty. It works well from a few hundred samples and requires no training. **NN** needs input normalisation and several thousand samples to generalise; use it once you have a large dataset. For initial experiments, start with kNN-GP (`k=10`).
|
|
||||||
|
|
||||||
### Usage
|
### Usage
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from nucon.model import NuconModelLearner
|
from nucon.model import NuconModelLearner
|
||||||
|
|
||||||
# --- Data collection ---
|
# --- Data collection (model_type not needed here) ---
|
||||||
learner = NuconModelLearner(
|
learner = NuconModelLearner(
|
||||||
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
|
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
|
||||||
include_valve_states=False, # set True to include all 53 valve positions as model inputs
|
include_valve_states=False, # set True to include all 53 valve positions as model inputs
|
||||||
@ -337,13 +333,13 @@ nn_learner.train_model(batch_size=32, num_epochs=50) # creates NN model on firs
|
|||||||
nn_learner.drop_well_fitted(error_threshold=1.0)
|
nn_learner.drop_well_fitted(error_threshold=1.0)
|
||||||
nn_learner.save_model('reactor_nn.pth')
|
nn_learner.save_model('reactor_nn.pth')
|
||||||
|
|
||||||
# --- kNN-GP backend ---
|
# --- kNN + GP backend ---
|
||||||
knn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
knn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
# Drop near-duplicate samples before fitting (keeps diverse coverage).
|
# Drop near-duplicate samples before fitting (keeps diverse coverage).
|
||||||
# A sample is dropped only if BOTH its input state AND output transition
|
# A sample is dropped only if BOTH its input state AND output transition
|
||||||
# are within the given distances of an already-kept sample.
|
# are within the given distances of an already-kept sample.
|
||||||
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
||||||
knn_learner.fit_knn(k=10) # creates kNN-GP model on first call
|
knn_learner.fit_knn(k=10) # creates kNN model on first call
|
||||||
|
|
||||||
# Point prediction
|
# Point prediction
|
||||||
state = knn_learner._get_state()
|
state = knn_learner._get_state()
|
||||||
@ -362,7 +358,7 @@ The trained models can be integrated into the NuconSimulator to provide accurate
|
|||||||
|
|
||||||
## Full Training Loop
|
## Full Training Loop
|
||||||
|
|
||||||
The recommended end-to-end workflow for training an RL operator is an iterative cycle of real-game data collection, model fitting, and simulated training. The real game is slow and cannot be parallelised, so the bulk of RL training happens in the simulator. The game is used only as an oracle for data and evaluation.
|
The recommended end-to-end workflow for training an RL operator is an iterative cycle of real-game data collection, model fitting, and simulated training. The real game is slow and cannot be parallelised, so the bulk of RL training happens in the simulator — the game is used only as an oracle for data and evaluation.
|
||||||
|
|
||||||
```
|
```
|
||||||
┌─────────────────────────────────────────────────────────────┐
|
┌─────────────────────────────────────────────────────────────┐
|
||||||
@ -401,17 +397,123 @@ The recommended end-to-end workflow for training an RL operator is an iterative
|
|||||||
└─────────────────────┘
|
└─────────────────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
**Step 1 — Human dataset collection**: Run `NuconModelLearner.collect_data()` during your play session. Cover a wide range of states: startup from cold, ramping power, individual rod bank adjustments. Diversity in the dataset directly determines simulator accuracy. See [Model Learning](#model-learning-work-in-progress) for collection details.
|
### Step 1 — Human dataset collection
|
||||||
|
|
||||||
**Step 2 — Initial model fitting**: Fit a kNN-GP model (instant) or NN (better extrapolation with larger datasets) using `fit_knn()` or `train_model()`. Prune near-duplicate samples with `drop_redundant()` before fitting. See [Model Learning](#model-learning).
|
Start `NuconModelLearner` before or during your play session. Try to cover a wide range of reactor states — startup from cold, ramping power up and down, adjusting individual rod banks, pump speed changes. Diversity in the dataset directly determines how accurate the simulator will be.
|
||||||
|
|
||||||
**Step 3 — Train RL in simulator**: Load the fitted model into `NuconSimulator`, then train a `NuconGoalEnv` policy with SAC + HER. The simulator runs far faster than the real game, allowing many trajectories in reasonable time. Use `uncertainty_penalty_start` and `uncertainty_abort` on the env to discourage the policy from wandering into regions the model hasn't seen: a linear penalty kicks in above the soft threshold, and the episode is truncated at the hard threshold. This keeps training within the reliable part of the model's knowledge. See [NuconGoalEnv + HER Usage](#nucongoalenv--her-usage).
|
```python
|
||||||
|
from nucon.model import NuconModelLearner
|
||||||
|
|
||||||
**Step 4 — Eval in game + collect new data**: Run the trained policy against the real game. This validates simulator accuracy and simultaneously collects new data from states the policy visits, which may be regions the original dataset missed. Run a second `NuconModelLearner` in a background thread to collect concurrently.
|
learner = NuconModelLearner(
|
||||||
|
dataset_path='reactor_dataset.pkl',
|
||||||
|
time_delta=10.0, # 10 game-seconds per sample
|
||||||
|
)
|
||||||
|
learner.collect_data(num_steps=500, save_every=10)
|
||||||
|
```
|
||||||
|
|
||||||
**Step 5 — Refit model on expanded data**: Merge new data into the original dataset with `merge_datasets()`, prune with `drop_redundant()`, and refit. Then return to Step 3 with the improved model. Each iteration the simulator gets more accurate and the policy improves.
|
The collector saves every 10 steps, retries automatically on game crashes, and scales wall-clock sleep with `GAME_SIM_SPEED` so samples are always 10 game-seconds apart regardless of simulation speed.
|
||||||
|
|
||||||
Stop when the policy performs well in the real game and kNN-GP uncertainty stays low throughout an episode, indicating the policy stays within the known data distribution.
|
### Step 2 — Initial model fitting
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.model import NuconModelLearner
|
||||||
|
|
||||||
|
learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
|
|
||||||
|
# Option A: kNN + GP (instant fit, built-in uncertainty estimation)
|
||||||
|
learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
||||||
|
learner.fit_knn(k=10)
|
||||||
|
learner.save_model('reactor_knn.pkl')
|
||||||
|
|
||||||
|
# Option B: Neural network (better extrapolation with larger datasets)
|
||||||
|
learner.train_model(batch_size=32, num_epochs=50)
|
||||||
|
learner.drop_well_fitted(error_threshold=1.0) # keep hard samples for next round
|
||||||
|
learner.save_model('reactor_nn.pth')
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 3 — Train RL in simulator
|
||||||
|
|
||||||
|
Load the fitted model into the simulator and train with SAC + HER. The simulator runs orders of magnitude faster than the real game, allowing millions of steps in reasonable time.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.sim import NuconSimulator, OperatingState
|
||||||
|
from nucon.rl import NuconGoalEnv
|
||||||
|
from stable_baselines3 import SAC
|
||||||
|
from stable_baselines3.common.buffers import HerReplayBuffer
|
||||||
|
|
||||||
|
simulator = NuconSimulator()
|
||||||
|
simulator.load_model('reactor_knn.pkl')
|
||||||
|
simulator.set_state(OperatingState.NOMINAL)
|
||||||
|
|
||||||
|
env = NuconGoalEnv(
|
||||||
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
||||||
|
tolerance=0.05,
|
||||||
|
simulator=simulator,
|
||||||
|
seconds_per_step=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
model = SAC(
|
||||||
|
'MultiInputPolicy', env,
|
||||||
|
replay_buffer_class=HerReplayBuffer,
|
||||||
|
replay_buffer_kwargs={'n_sampled_goal': 4, 'goal_selection_strategy': 'future'},
|
||||||
|
verbose=1,
|
||||||
|
)
|
||||||
|
model.learn(total_timesteps=500_000)
|
||||||
|
model.save('rl_policy.zip')
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 4 — Eval in game + collect new data
|
||||||
|
|
||||||
|
Run the trained policy against the real game. This validates whether the simulator was accurate enough, and simultaneously collects new data covering states the policy visits — which may be regions the original dataset missed.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.rl import NuconGoalEnv
|
||||||
|
from nucon.model import NuconModelLearner
|
||||||
|
from stable_baselines3 import SAC
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
# Load policy and run in real game
|
||||||
|
env = NuconGoalEnv(
|
||||||
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
||||||
|
seconds_per_step=10,
|
||||||
|
)
|
||||||
|
policy = SAC.load('rl_policy.zip')
|
||||||
|
|
||||||
|
# Simultaneously collect new data
|
||||||
|
new_data_learner = NuconModelLearner(
|
||||||
|
dataset_path='reactor_dataset_new.pkl',
|
||||||
|
time_delta=10.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
obs, _ = env.reset()
|
||||||
|
for _ in range(200):
|
||||||
|
action, _ = policy.predict(obs, deterministic=True)
|
||||||
|
obs, reward, terminated, truncated, _ = env.step(action)
|
||||||
|
if terminated or truncated:
|
||||||
|
obs, _ = env.reset()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 5 — Refit model on expanded data
|
||||||
|
|
||||||
|
Merge the new data into the original dataset and refit:
|
||||||
|
|
||||||
|
```python
|
||||||
|
learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
|
learner.merge_datasets('reactor_dataset_new.pkl')
|
||||||
|
|
||||||
|
# Prune redundant samples before refitting
|
||||||
|
learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
||||||
|
print(f"Dataset size after pruning: {len(learner.dataset)}")
|
||||||
|
|
||||||
|
learner.fit_knn(k=10)
|
||||||
|
learner.save_model('reactor_knn.pkl')
|
||||||
|
```
|
||||||
|
|
||||||
|
Then go back to Step 3 with the improved model. Each iteration the simulator gets more accurate, the policy gets better, and the new data collection explores increasingly interesting regions of state space.
|
||||||
|
|
||||||
|
**When to stop**: when the policy performs well in the real game and the kNN uncertainty stays low throughout an episode (indicating the policy stays within the known data distribution).
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
|
|||||||
424
nucon/rl.py
424
nucon/rl.py
@ -1,83 +1,159 @@
|
|||||||
import inspect
|
|
||||||
import gymnasium as gym
|
import gymnasium as gym
|
||||||
from gymnasium import spaces
|
from gymnasium import spaces
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import time
|
import time
|
||||||
from typing import Dict, Any, Callable, List, Optional
|
from typing import Dict, Any
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
|
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Reward / objective helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
Objectives = {
|
Objectives = {
|
||||||
"null": lambda obs: 0,
|
"null": lambda obs: 0,
|
||||||
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
|
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
|
||||||
"episode_time": lambda obs: obs["EPISODE_TIME"],
|
"episode_time": lambda obs: obs["EPISODE_TIME"],
|
||||||
}
|
}
|
||||||
|
|
||||||
Parameterized_Objectives = {
|
Parameterized_Objectives = {
|
||||||
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
|
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
|
||||||
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
|
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
|
||||||
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
|
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
|
||||||
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
|
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
|
||||||
"constant": lambda constant: lambda obs: constant,
|
"constant": lambda constant: lambda obs: constant,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class NuconEnv(gym.Env):
|
||||||
|
metadata = {'render_modes': ['human']}
|
||||||
|
|
||||||
def UncertaintyPenalty(start: float = 0.3, scale: float = 1.0, mode: str = 'l2') -> Callable:
|
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
|
||||||
"""Objective that penalises high simulator uncertainty.
|
super().__init__()
|
||||||
|
|
||||||
Returns a callable ``(obs) -> float`` suitable for use as an objective or
|
self.render_mode = render_mode
|
||||||
terminator in NuconEnv / NuconGoalEnv. Works because ``SIM_UNCERTAINTY``
|
self.seconds_per_step = seconds_per_step
|
||||||
is injected into the obs dict whenever a simulator is active.
|
if objective_weights is None:
|
||||||
|
objective_weights = [1.0 for objective in objectives]
|
||||||
|
self.objective_weights = objective_weights
|
||||||
|
self.terminate_above = terminate_above
|
||||||
|
self.simulator = simulator
|
||||||
|
|
||||||
Args:
|
if nucon is None:
|
||||||
start: uncertainty level at which the penalty starts (default 0.3).
|
if simulator:
|
||||||
scale: penalty coefficient.
|
nucon = Nucon(port=simulator.port)
|
||||||
mode: ``'l2'`` (quadratic, default) or ``'linear'``.
|
else:
|
||||||
|
nucon = Nucon()
|
||||||
|
self.nucon = nucon
|
||||||
|
|
||||||
Example::
|
# Define observation space
|
||||||
|
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
||||||
|
for param_id, param in self.nucon.get_all_readable().items():
|
||||||
|
sp = _build_param_space(param)
|
||||||
|
if sp is not None:
|
||||||
|
obs_spaces[param_id] = sp
|
||||||
|
self.observation_space = spaces.Dict(obs_spaces)
|
||||||
|
|
||||||
env = NuconEnv(
|
# Define action space (only controllable, non-cheat, readable-back params)
|
||||||
objectives=['max_power', UncertaintyPenalty(start=0.3, scale=2.0)],
|
action_spaces = {}
|
||||||
objective_weights=[1.0, 1.0],
|
for param_id, param in self.nucon.get_all_writable().items():
|
||||||
simulator=simulator,
|
if not param.is_readable or param.is_cheat:
|
||||||
)
|
continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded
|
||||||
"""
|
sp = _build_param_space(param)
|
||||||
excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start)
|
if sp is not None:
|
||||||
if mode == 'l2':
|
action_spaces[param_id] = sp
|
||||||
return lambda obs: -scale * excess(obs) ** 2
|
self.action_space = spaces.Dict(action_spaces)
|
||||||
elif mode == 'linear':
|
|
||||||
return lambda obs: -scale * excess(obs)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.")
|
|
||||||
|
|
||||||
|
self.objectives = []
|
||||||
|
self.terminators = []
|
||||||
|
|
||||||
def UncertaintyAbort(threshold: float = 0.7) -> Callable:
|
for objective in objectives:
|
||||||
"""Terminator that aborts the episode when simulator uncertainty is too high.
|
if objective in Objectives:
|
||||||
|
self.objectives.append(Objectives[objective])
|
||||||
|
elif callable(objective):
|
||||||
|
self.objectives.append(objective)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unsupported objective: {objective}")
|
||||||
|
|
||||||
Returns a callable ``(obs) -> float`` for use as a *terminator*. When
|
for terminator in terminators:
|
||||||
the GP posterior std exceeds ``threshold`` the episode is truncated
|
if terminator in Objectives:
|
||||||
(``terminated=True``).
|
self.terminators.append(Objectives[terminator])
|
||||||
|
elif callable(terminator):
|
||||||
|
self.terminators.append(terminator)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unsupported terminator: {terminator}")
|
||||||
|
|
||||||
Example::
|
def _get_obs(self):
|
||||||
|
obs = {}
|
||||||
|
for param_id, param in self.nucon.get_all_readable().items():
|
||||||
|
if param.param_type == str or param_id not in self.observation_space.spaces:
|
||||||
|
continue
|
||||||
|
value = self.nucon.get(param_id)
|
||||||
|
if isinstance(value, Enum):
|
||||||
|
value = value.value
|
||||||
|
obs[param_id] = value
|
||||||
|
obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step
|
||||||
|
return obs
|
||||||
|
|
||||||
env = NuconEnv(
|
def _get_info(self):
|
||||||
objectives=['max_power'],
|
info = {'objectives': {}, 'objectives_weighted': {}}
|
||||||
terminators=[UncertaintyAbort(threshold=0.7)],
|
for objective, weight in zip(self.objectives, self.objective_weights):
|
||||||
terminate_above=0,
|
obj = objective(self._get_obs())
|
||||||
simulator=simulator,
|
info['objectives'][objective.__name__] = obj
|
||||||
)
|
info['objectives_weighted'][objective.__name__] = obj * weight
|
||||||
"""
|
return info
|
||||||
return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0
|
|
||||||
|
|
||||||
|
def reset(self, seed=None, options=None):
|
||||||
|
super().reset(seed=seed)
|
||||||
|
|
||||||
|
self._total_steps = 0
|
||||||
|
observation = self._get_obs()
|
||||||
|
info = self._get_info()
|
||||||
|
|
||||||
|
return observation, info
|
||||||
|
|
||||||
|
def step(self, action):
|
||||||
|
# Apply the action to the Nucon system
|
||||||
|
for param_id, value in action.items():
|
||||||
|
param = self.nucon._parameters[param_id]
|
||||||
|
if issubclass(param.param_type, Enum):
|
||||||
|
value = param.param_type(int(np.asarray(value).flat[0]))
|
||||||
|
else:
|
||||||
|
value = param.param_type(np.asarray(value).flat[0])
|
||||||
|
if param.min_val is not None and param.max_val is not None:
|
||||||
|
value = np.clip(value, param.min_val, param.max_val)
|
||||||
|
self.nucon.set(param, value)
|
||||||
|
|
||||||
|
observation = self._get_obs()
|
||||||
|
terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above
|
||||||
|
truncated = False
|
||||||
|
info = self._get_info()
|
||||||
|
reward = sum(obj for obj in info['objectives_weighted'].values())
|
||||||
|
|
||||||
|
self._total_steps += 1
|
||||||
|
if self.simulator:
|
||||||
|
self.simulator.update(self.seconds_per_step)
|
||||||
|
else:
|
||||||
|
# Sleep to let the game advance seconds_per_step game-seconds,
|
||||||
|
# accounting for the game's simulation speed multiplier.
|
||||||
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
||||||
|
time.sleep(self.seconds_per_step / sim_speed)
|
||||||
|
return observation, reward, terminated, truncated, info
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
if self.render_mode == "human":
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _flatten_action(self, action):
|
||||||
|
return np.concatenate([v.flatten() for v in action.values()])
|
||||||
|
|
||||||
|
def _unflatten_action(self, flat_action):
|
||||||
|
return {k: v.reshape(1, -1) for k, v in self.action_space.items()}
|
||||||
|
|
||||||
|
def _flatten_observation(self, observation):
|
||||||
|
return np.concatenate([v.flatten() for v in observation.values()])
|
||||||
|
|
||||||
|
def _unflatten_observation(self, flat_observation):
|
||||||
|
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Internal helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _build_param_space(param):
|
def _build_param_space(param):
|
||||||
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
|
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
|
||||||
@ -96,174 +172,30 @@ def _build_param_space(param):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _apply_action(nucon, action):
|
|
||||||
for param_id, value in action.items():
|
|
||||||
param = nucon._parameters[param_id]
|
|
||||||
if issubclass(param.param_type, Enum):
|
|
||||||
value = param.param_type(int(np.asarray(value).flat[0]))
|
|
||||||
else:
|
|
||||||
value = param.param_type(np.asarray(value).flat[0])
|
|
||||||
if param.min_val is not None and param.max_val is not None:
|
|
||||||
value = np.clip(value, param.min_val, param.max_val)
|
|
||||||
nucon.set(param, value)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# NuconEnv
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class NuconEnv(gym.Env):
|
|
||||||
metadata = {'render_modes': ['human']}
|
|
||||||
|
|
||||||
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5,
|
|
||||||
objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.render_mode = render_mode
|
|
||||||
self.seconds_per_step = seconds_per_step
|
|
||||||
if objective_weights is None:
|
|
||||||
objective_weights = [1.0 for _ in objectives]
|
|
||||||
self.objective_weights = objective_weights
|
|
||||||
self.terminate_above = terminate_above
|
|
||||||
self.simulator = simulator
|
|
||||||
|
|
||||||
if nucon is None:
|
|
||||||
nucon = Nucon(port=simulator.port) if simulator else Nucon()
|
|
||||||
self.nucon = nucon
|
|
||||||
|
|
||||||
# Observation space — SIM_UNCERTAINTY included when a simulator is present
|
|
||||||
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
|
||||||
if simulator is not None:
|
|
||||||
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
|
|
||||||
for param_id, param in self.nucon.get_all_readable().items():
|
|
||||||
sp = _build_param_space(param)
|
|
||||||
if sp is not None:
|
|
||||||
obs_spaces[param_id] = sp
|
|
||||||
self.observation_space = spaces.Dict(obs_spaces)
|
|
||||||
|
|
||||||
# Action space
|
|
||||||
action_spaces = {}
|
|
||||||
for param_id, param in self.nucon.get_all_writable().items():
|
|
||||||
if not param.is_readable or param.is_cheat:
|
|
||||||
continue
|
|
||||||
sp = _build_param_space(param)
|
|
||||||
if sp is not None:
|
|
||||||
action_spaces[param_id] = sp
|
|
||||||
self.action_space = spaces.Dict(action_spaces)
|
|
||||||
|
|
||||||
self.objectives = []
|
|
||||||
self.terminators = []
|
|
||||||
for objective in objectives:
|
|
||||||
if objective in Objectives:
|
|
||||||
self.objectives.append(Objectives[objective])
|
|
||||||
elif callable(objective):
|
|
||||||
self.objectives.append(objective)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unsupported objective: {objective}")
|
|
||||||
for terminator in terminators:
|
|
||||||
if terminator in Objectives:
|
|
||||||
self.terminators.append(Objectives[terminator])
|
|
||||||
elif callable(terminator):
|
|
||||||
self.terminators.append(terminator)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unsupported terminator: {terminator}")
|
|
||||||
|
|
||||||
def _get_obs(self, sim_uncertainty=None):
|
|
||||||
obs = {}
|
|
||||||
for param_id, param in self.nucon.get_all_readable().items():
|
|
||||||
if param.param_type == str or param_id not in self.observation_space.spaces:
|
|
||||||
continue
|
|
||||||
value = self.nucon.get(param_id)
|
|
||||||
if isinstance(value, Enum):
|
|
||||||
value = value.value
|
|
||||||
obs[param_id] = value
|
|
||||||
obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step
|
|
||||||
if 'SIM_UNCERTAINTY' in self.observation_space.spaces:
|
|
||||||
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
|
|
||||||
return obs
|
|
||||||
|
|
||||||
def _get_info(self, obs):
|
|
||||||
info = {'objectives': {}, 'objectives_weighted': {}}
|
|
||||||
for objective, weight in zip(self.objectives, self.objective_weights):
|
|
||||||
obj = objective(obs)
|
|
||||||
name = getattr(objective, '__name__', repr(objective))
|
|
||||||
info['objectives'][name] = obj
|
|
||||||
info['objectives_weighted'][name] = obj * weight
|
|
||||||
return info
|
|
||||||
|
|
||||||
def reset(self, seed=None, options=None):
|
|
||||||
super().reset(seed=seed)
|
|
||||||
self._total_steps = 0
|
|
||||||
observation = self._get_obs()
|
|
||||||
return observation, self._get_info(observation)
|
|
||||||
|
|
||||||
def step(self, action):
|
|
||||||
_apply_action(self.nucon, action)
|
|
||||||
|
|
||||||
# Advance sim (or sleep) — get uncertainty for obs injection
|
|
||||||
truncated = False
|
|
||||||
uncertainty = None
|
|
||||||
if self.simulator:
|
|
||||||
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
|
|
||||||
else:
|
|
||||||
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
|
||||||
time.sleep(self.seconds_per_step / sim_speed)
|
|
||||||
|
|
||||||
self._total_steps += 1
|
|
||||||
observation = self._get_obs(sim_uncertainty=uncertainty)
|
|
||||||
info = self._get_info(observation)
|
|
||||||
reward = sum(obj for obj in info['objectives_weighted'].values())
|
|
||||||
terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above
|
|
||||||
return observation, reward, terminated, truncated, info
|
|
||||||
|
|
||||||
def render(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _flatten_observation(self, observation):
|
|
||||||
return np.concatenate([np.asarray(v).flatten() for v in observation.values()])
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# NuconGoalEnv
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class NuconGoalEnv(gym.Env):
|
class NuconGoalEnv(gym.Env):
|
||||||
"""
|
"""
|
||||||
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
|
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
|
||||||
|
|
||||||
Observation is a Dict with three keys:
|
The observation is a Dict with three keys as required by GoalEnv / HER:
|
||||||
- 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active)
|
- 'observation': all readable non-goal, non-str params (same encoding as NuconEnv)
|
||||||
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
|
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
|
||||||
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
|
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
|
||||||
|
|
||||||
``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly.
|
Reward defaults to negative L2 distance in the normalised goal space (dense).
|
||||||
|
Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance).
|
||||||
reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form
|
|
||||||
receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping.
|
|
||||||
|
|
||||||
Usage with SB3 HER::
|
Usage with SB3 HER::
|
||||||
|
|
||||||
from stable_baselines3 import SAC
|
from stable_baselines3 import SAC
|
||||||
from stable_baselines3.common.buffers import HerReplayBuffer
|
from stable_baselines3.common.buffers import HerReplayBuffer
|
||||||
from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort
|
|
||||||
|
|
||||||
env = NuconGoalEnv(
|
env = NuconGoalEnv(
|
||||||
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
||||||
tolerance=0.05,
|
|
||||||
simulator=simulator,
|
simulator=simulator,
|
||||||
# uncertainty-aware reward: penalise OOD, abort if too far out
|
|
||||||
reward_fn=lambda ag, dg, obs: (
|
|
||||||
-(np.linalg.norm(ag - dg) ** 2)
|
|
||||||
- 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2
|
|
||||||
),
|
|
||||||
terminators=[UncertaintyAbort(threshold=0.7)],
|
|
||||||
)
|
)
|
||||||
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
|
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
|
||||||
model.learn(total_timesteps=500_000)
|
model.learn(total_timesteps=200_000)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
metadata = {'render_modes': ['human']}
|
metadata = {'render_modes': ['human']}
|
||||||
@ -295,12 +227,14 @@ class NuconGoalEnv(gym.Env):
|
|||||||
self.nucon = nucon
|
self.nucon = nucon
|
||||||
|
|
||||||
all_readable = self.nucon.get_all_readable()
|
all_readable = self.nucon.get_all_readable()
|
||||||
|
|
||||||
|
# Validate goal params and build per-param range arrays
|
||||||
for pid in self.goal_params:
|
for pid in self.goal_params:
|
||||||
if pid not in all_readable:
|
if pid not in all_readable:
|
||||||
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
|
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
|
||||||
|
|
||||||
goal_range = goal_range or {}
|
goal_range = goal_range or {}
|
||||||
self._goal_low = np.array([
|
self._goal_low = np.array([
|
||||||
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
|
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
|
||||||
for pid in self.goal_params
|
for pid in self.goal_params
|
||||||
], dtype=np.float32)
|
], dtype=np.float32)
|
||||||
@ -309,21 +243,13 @@ class NuconGoalEnv(gym.Env):
|
|||||||
for pid in self.goal_params
|
for pid in self.goal_params
|
||||||
], dtype=np.float32)
|
], dtype=np.float32)
|
||||||
self._goal_range = self._goal_high - self._goal_low
|
self._goal_range = self._goal_high - self._goal_low
|
||||||
self._goal_range[self._goal_range == 0] = 1.0
|
self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero
|
||||||
|
|
||||||
# Detect reward_fn arity for backward compat (2-arg vs 3-arg)
|
self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None
|
||||||
self._reward_fn = reward_fn
|
|
||||||
if reward_fn is not None:
|
|
||||||
n_args = len(inspect.signature(reward_fn).parameters)
|
|
||||||
self._reward_fn_wants_obs = n_args >= 3
|
|
||||||
else:
|
|
||||||
self._reward_fn_wants_obs = False
|
|
||||||
|
|
||||||
# Observation subspace
|
# Observation subspace: all readable non-str non-goal params
|
||||||
goal_set = set(self.goal_params)
|
goal_set = set(self.goal_params)
|
||||||
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
||||||
if simulator is not None:
|
|
||||||
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
|
|
||||||
for param_id, param in all_readable.items():
|
for param_id, param in all_readable.items():
|
||||||
if param_id in goal_set:
|
if param_id in goal_set:
|
||||||
continue
|
continue
|
||||||
@ -338,7 +264,7 @@ class NuconGoalEnv(gym.Env):
|
|||||||
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
|
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
|
||||||
})
|
})
|
||||||
|
|
||||||
# Action space
|
# Action space: readable-back, non-cheat writable params
|
||||||
action_spaces = {}
|
action_spaces = {}
|
||||||
for param_id, param in self.nucon.get_all_writable().items():
|
for param_id, param in self.nucon.get_all_writable().items():
|
||||||
if not param.is_readable or param.is_cheat:
|
if not param.is_readable or param.is_cheat:
|
||||||
@ -348,16 +274,23 @@ class NuconGoalEnv(gym.Env):
|
|||||||
action_spaces[param_id] = sp
|
action_spaces[param_id] = sp
|
||||||
self.action_space = spaces.Dict(action_spaces)
|
self.action_space = spaces.Dict(action_spaces)
|
||||||
|
|
||||||
|
# Terminators
|
||||||
self._terminators = terminators or []
|
self._terminators = terminators or []
|
||||||
|
|
||||||
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
|
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
|
||||||
self._total_steps = 0
|
self._total_steps = 0
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# GoalEnv interface
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def compute_reward(self, achieved_goal, desired_goal, info):
|
def compute_reward(self, achieved_goal, desired_goal, info):
|
||||||
"""Dense negative L2, sparse with tolerance, or custom reward_fn."""
|
"""
|
||||||
obs = info.get('obs', {}) if isinstance(info, dict) else {}
|
Dense: negative L2 in normalised goal space (each dim in [0,1]).
|
||||||
|
Sparse when tolerance is set: 0 if within tolerance, -1 otherwise.
|
||||||
|
Custom reward_fn overrides both.
|
||||||
|
"""
|
||||||
if self._reward_fn is not None:
|
if self._reward_fn is not None:
|
||||||
if self._reward_fn_wants_obs:
|
|
||||||
return self._reward_fn(achieved_goal, desired_goal, obs)
|
|
||||||
return self._reward_fn(achieved_goal, desired_goal)
|
return self._reward_fn(achieved_goal, desired_goal)
|
||||||
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
|
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
|
||||||
if self.tolerance is not None:
|
if self.tolerance is not None:
|
||||||
@ -365,13 +298,13 @@ class NuconGoalEnv(gym.Env):
|
|||||||
return -dist
|
return -dist
|
||||||
|
|
||||||
def _read_goal_values(self):
|
def _read_goal_values(self):
|
||||||
raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32)
|
raw = np.array([
|
||||||
|
self.nucon.get(pid) or 0.0 for pid in self.goal_params
|
||||||
|
], dtype=np.float32)
|
||||||
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
|
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
|
||||||
|
|
||||||
def _get_obs_dict(self, sim_uncertainty=None):
|
def _get_obs_dict(self):
|
||||||
obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)}
|
obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)}
|
||||||
if 'SIM_UNCERTAINTY' in self.observation_space['observation'].spaces:
|
|
||||||
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
|
|
||||||
goal_set = set(self.goal_params)
|
goal_set = set(self.goal_params)
|
||||||
for param_id, param in self.nucon.get_all_readable().items():
|
for param_id, param in self.nucon.get_all_readable().items():
|
||||||
if param_id in goal_set or param_id not in self.observation_space['observation'].spaces:
|
if param_id in goal_set or param_id not in self.observation_space['observation'].spaces:
|
||||||
@ -390,28 +323,38 @@ class NuconGoalEnv(gym.Env):
|
|||||||
def reset(self, seed=None, options=None):
|
def reset(self, seed=None, options=None):
|
||||||
super().reset(seed=seed)
|
super().reset(seed=seed)
|
||||||
self._total_steps = 0
|
self._total_steps = 0
|
||||||
|
|
||||||
|
# Sample a new goal uniformly from the goal range
|
||||||
rng = np.random.default_rng(seed)
|
rng = np.random.default_rng(seed)
|
||||||
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
|
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
|
||||||
return self._get_obs_dict(), {}
|
|
||||||
|
obs = self._get_obs_dict()
|
||||||
|
return obs, {}
|
||||||
|
|
||||||
def step(self, action):
|
def step(self, action):
|
||||||
_apply_action(self.nucon, action)
|
for param_id, value in action.items():
|
||||||
|
param = self.nucon._parameters[param_id]
|
||||||
|
if issubclass(param.param_type, Enum):
|
||||||
|
value = param.param_type(int(np.asarray(value).flat[0]))
|
||||||
|
else:
|
||||||
|
value = param.param_type(np.asarray(value).flat[0])
|
||||||
|
if param.min_val is not None and param.max_val is not None:
|
||||||
|
value = np.clip(value, param.min_val, param.max_val)
|
||||||
|
self.nucon.set(param, value)
|
||||||
|
|
||||||
# Advance sim (or sleep)
|
obs = self._get_obs_dict()
|
||||||
uncertainty = None
|
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {}))
|
||||||
|
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
|
||||||
|
truncated = False
|
||||||
|
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']}
|
||||||
|
|
||||||
|
self._total_steps += 1
|
||||||
if self.simulator:
|
if self.simulator:
|
||||||
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
|
self.simulator.update(self.seconds_per_step)
|
||||||
else:
|
else:
|
||||||
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
||||||
time.sleep(self.seconds_per_step / sim_speed)
|
time.sleep(self.seconds_per_step / sim_speed)
|
||||||
|
|
||||||
self._total_steps += 1
|
|
||||||
obs = self._get_obs_dict(sim_uncertainty=uncertainty)
|
|
||||||
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal'],
|
|
||||||
'obs': obs['observation']}
|
|
||||||
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info))
|
|
||||||
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
|
|
||||||
truncated = False
|
|
||||||
return obs, reward, terminated, truncated, info
|
return obs, reward, terminated, truncated, info
|
||||||
|
|
||||||
def render(self):
|
def render(self):
|
||||||
@ -421,10 +364,6 @@ class NuconGoalEnv(gym.Env):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Registration
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def register_nucon_envs():
|
def register_nucon_envs():
|
||||||
gym.register(
|
gym.register(
|
||||||
id='Nucon-max_power-v0',
|
id='Nucon-max_power-v0',
|
||||||
@ -439,11 +378,9 @@ def register_nucon_envs():
|
|||||||
gym.register(
|
gym.register(
|
||||||
id='Nucon-safe_max_power-v0',
|
id='Nucon-safe_max_power-v0',
|
||||||
entry_point='nucon.rl:NuconEnv',
|
entry_point='nucon.rl:NuconEnv',
|
||||||
kwargs={'seconds_per_step': 5,
|
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
|
||||||
'objectives': [Parameterized_Objectives['temp_above'](min_temp=310),
|
|
||||||
Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'],
|
|
||||||
'objective_weights': [1, 10, 1/100_000]}
|
|
||||||
)
|
)
|
||||||
|
# Goal-conditioned: target total generator output (train with HER)
|
||||||
gym.register(
|
gym.register(
|
||||||
id='Nucon-goal_power-v0',
|
id='Nucon-goal_power-v0',
|
||||||
entry_point='nucon.rl:NuconGoalEnv',
|
entry_point='nucon.rl:NuconGoalEnv',
|
||||||
@ -453,6 +390,7 @@ def register_nucon_envs():
|
|||||||
'seconds_per_step': 5,
|
'seconds_per_step': 5,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
# Goal-conditioned: target core temperature (train with HER)
|
||||||
gym.register(
|
gym.register(
|
||||||
id='Nucon-goal_temp-v0',
|
id='Nucon-goal_temp-v0',
|
||||||
entry_point='nucon.rl:NuconGoalEnv',
|
entry_point='nucon.rl:NuconGoalEnv',
|
||||||
|
|||||||
20
nucon/sim.py
20
nucon/sim.py
@ -215,16 +215,9 @@ class NuconSimulator:
|
|||||||
def set_allow_all_writes(self, allow: bool) -> None:
|
def set_allow_all_writes(self, allow: bool) -> None:
|
||||||
self.allow_all_writes = allow
|
self.allow_all_writes = allow
|
||||||
|
|
||||||
def update(self, time_step: float, return_uncertainty: bool = False):
|
def update(self, time_step: float) -> None:
|
||||||
"""Advance the simulator by time_step game-seconds.
|
self._update_reactor_state(time_step)
|
||||||
|
|
||||||
If return_uncertainty=True and a kNN model is loaded, returns the GP
|
|
||||||
posterior std for this step (0 = on known data, ~1 = OOD).
|
|
||||||
Always returns None when using an NN model.
|
|
||||||
"""
|
|
||||||
uncertainty = self._update_reactor_state(time_step, return_uncertainty=return_uncertainty)
|
|
||||||
self.time += time_step
|
self.time += time_step
|
||||||
return uncertainty
|
|
||||||
|
|
||||||
def set_model(self, model) -> None:
|
def set_model(self, model) -> None:
|
||||||
"""Set a pre-loaded ReactorDynamicsModel or ReactorKNNModel directly."""
|
"""Set a pre-loaded ReactorDynamicsModel or ReactorKNNModel directly."""
|
||||||
@ -256,7 +249,7 @@ class NuconSimulator:
|
|||||||
print(f"Error loading model: {str(e)}")
|
print(f"Error loading model: {str(e)}")
|
||||||
self.model = None
|
self.model = None
|
||||||
|
|
||||||
def _update_reactor_state(self, time_step: float, return_uncertainty: bool = False):
|
def _update_reactor_state(self, time_step: float) -> None:
|
||||||
if not self.model:
|
if not self.model:
|
||||||
raise ValueError("Model not set. Please load a model using load_model() or set_model().")
|
raise ValueError("Model not set. Please load a model using load_model() or set_model().")
|
||||||
|
|
||||||
@ -270,13 +263,10 @@ class NuconSimulator:
|
|||||||
value = 0.0 # fallback for params not initialised in sim state
|
value = 0.0 # fallback for params not initialised in sim state
|
||||||
state[param_id] = value
|
state[param_id] = value
|
||||||
|
|
||||||
# Forward pass
|
# Forward pass — same interface for both NN and kNN
|
||||||
uncertainty = None
|
|
||||||
if isinstance(self.model, ReactorDynamicsModel):
|
if isinstance(self.model, ReactorDynamicsModel):
|
||||||
with torch.no_grad():
|
with torch.no_grad():
|
||||||
next_state = self.model.forward(state, time_step)
|
next_state = self.model.forward(state, time_step)
|
||||||
elif return_uncertainty:
|
|
||||||
next_state, uncertainty = self.model.forward_with_uncertainty(state, time_step)
|
|
||||||
else:
|
else:
|
||||||
next_state = self.model.forward(state, time_step)
|
next_state = self.model.forward(state, time_step)
|
||||||
|
|
||||||
@ -287,8 +277,6 @@ class NuconSimulator:
|
|||||||
except (ValueError, KeyError):
|
except (ValueError, KeyError):
|
||||||
pass # ignore params that can't be set (type mismatch, unknown)
|
pass # ignore params that can't be set (type mismatch, unknown)
|
||||||
|
|
||||||
return uncertainty
|
|
||||||
|
|
||||||
def set_state(self, state: OperatingState) -> None:
|
def set_state(self, state: OperatingState) -> None:
|
||||||
self._sample_parameters_from_state(state)
|
self._sample_parameters_from_state(state)
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user