Compare commits
7 Commits
7fcc809852
...
3eb0cc7b60
| Author | SHA1 | Date | |
|---|---|---|---|
| 3eb0cc7b60 | |||
| a4f898c3ad | |||
| c3111ad5be | |||
| 088b7d4733 | |||
| ce2019e060 | |||
| 1f7ecc301f | |||
| 0dab7a6cec |
277
README.md
277
README.md
@ -9,7 +9,7 @@ NuCon (Nucleares Controller) is a Python library designed to interface with and
|
|||||||
NuCon further provides a work in progress implementation of a reinforcement learning environment for training control policies and a simulator based on model learning.
|
NuCon further provides a work in progress implementation of a reinforcement learning environment for training control policies and a simulator based on model learning.
|
||||||
|
|
||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
> Nucleares only exposes RODS_POS_ORDERED as writable parameter, and no parameters about core chemistry e.g. Xenon concentration. While NuCon is already usable, it's capabilities are still very limited based on these restrictions. The capabilites are supposed to be extended in future updates to Nucleares, development on the advanced features (Reinforcement / Model Learning) are paused till then.
|
> NuCon is compatible with Nucleares v2.2.25.213. The game exposes a rich set of writable parameters including individual rod bank positions (`ROD_BANK_POS_{0-8}_ORDERED`), pump speeds, MSCV and turbine bypass setpoints, and various switches. Core chemistry parameters (e.g. Xenon concentration) are still read-only. Development on the advanced features (Reinforcement / Model Learning) is ongoing.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
@ -123,18 +123,24 @@ To use you'll need to install `gymnasium` and `numpy`. You can do so via
|
|||||||
pip install -e '.[rl]'
|
pip install -e '.[rl]'
|
||||||
```
|
```
|
||||||
|
|
||||||
### RL Environment
|
### Environments
|
||||||
|
|
||||||
The `NuconEnv` class in `nucon/rl.py` provides a Gym-compatible environment for reinforcement learning tasks in the Nucleares simulation. Key features include:
|
Two environment classes are provided in `nucon/rl.py`:
|
||||||
|
|
||||||
- Observation space: Includes all readable parameters from the NuCon system.
|
**`NuconEnv`** — classic fixed-objective environment. You define one or more objectives at construction time (e.g. maximise power output, keep temperature in range). The agent always trains toward the same goal.
|
||||||
- Action space: Encompasses all writable parameters in the NuCon system.
|
|
||||||
- Step function: Applies actions to the NuCon system and returns new observations.
|
|
||||||
- Objective function: Allows for predefined or custom objective functions to be defined for training.
|
|
||||||
|
|
||||||
### Usage
|
- Observation space: all readable numeric parameters (~290 dims).
|
||||||
|
- Action space: all readable-back writable parameters (~30 dims): 9 individual rod bank positions, 3 MSCVs, 3 turbine bypass valves, 6 coolant pump speeds, condenser pump, freight/vent switches, resistor banks, and more.
|
||||||
|
- Objectives: predefined strings (`'max_power'`, `'episode_time'`) or arbitrary callables `(obs) -> float`. Multiple objectives are weighted-summed.
|
||||||
|
|
||||||
|
**`NuconGoalEnv`** — goal-conditioned environment. The desired goal (e.g. target generator output) is sampled at the start of each episode and provided as part of the observation. A single policy learns to reach *any* goal in the specified range, making it far more useful than a fixed-objective agent. Designed for training with [Hindsight Experience Replay (HER)](https://arxiv.org/abs/1707.01495), which makes sparse-reward goal-conditioned training tractable.
|
||||||
|
|
||||||
|
- Observation space: `Dict` with keys `observation` (non-goal params), `achieved_goal` (current goal param values, normalised to [0,1]), `desired_goal` (target, normalised to [0,1]).
|
||||||
|
- Goals are sampled uniformly from the specified `goal_range` each episode.
|
||||||
|
- Reward defaults to negative L2 distance in normalised goal space (dense). Pass `tolerance` for a sparse `{0, -1}` reward — this works particularly well with HER.
|
||||||
|
|
||||||
|
### NuconEnv Usage
|
||||||
|
|
||||||
Here's a basic example of how to use the RL environment:
|
|
||||||
```python
|
```python
|
||||||
from nucon.rl import NuconEnv, Parameterized_Objectives
|
from nucon.rl import NuconEnv, Parameterized_Objectives
|
||||||
|
|
||||||
@ -154,44 +160,88 @@ env.close()
|
|||||||
|
|
||||||
Objectives takes either strings of the name of predefined objectives, or lambda functions which take an observation and return a scalar reward. Final rewards are (weighted) summed across all objectives. `info['objectives']` contains all objectives and their values.
|
Objectives takes either strings of the name of predefined objectives, or lambda functions which take an observation and return a scalar reward. Final rewards are (weighted) summed across all objectives. `info['objectives']` contains all objectives and their values.
|
||||||
|
|
||||||
You can e.g. train an PPO agent using the [sb3](https://github.com/DLR-RM/stable-baselines3) implementation:
|
You can e.g. train a PPO agent using the [sb3](https://github.com/DLR-RM/stable-baselines3) implementation:
|
||||||
```python
|
```python
|
||||||
from nucon.rl import NuconEnv
|
from nucon.rl import NuconEnv
|
||||||
from stable_baselines3 import PPO
|
from stable_baselines3 import PPO
|
||||||
|
|
||||||
env = NuconEnv(objectives=['max_power'], seconds_per_step=5)
|
env = NuconEnv(objectives=['max_power'], seconds_per_step=5)
|
||||||
|
|
||||||
# Create the PPO (Proximal Policy Optimization) model
|
|
||||||
model = PPO(
|
model = PPO(
|
||||||
"MlpPolicy",
|
"MlpPolicy",
|
||||||
env,
|
env,
|
||||||
verbose=1,
|
verbose=1,
|
||||||
learning_rate=3e-4, # You can adjust hyperparameters as needed
|
learning_rate=3e-4,
|
||||||
n_steps=2048,
|
n_steps=2048,
|
||||||
batch_size=64,
|
batch_size=64,
|
||||||
n_epochs=10,
|
n_epochs=10,
|
||||||
gamma=0.99,
|
gamma=0.99,
|
||||||
gae_lambda=0.95,
|
gae_lambda=0.95,
|
||||||
clip_range=0.2,
|
clip_range=0.2,
|
||||||
ent_coef=0.01
|
ent_coef=0.01,
|
||||||
)
|
)
|
||||||
|
model.learn(total_timesteps=100_000)
|
||||||
|
|
||||||
# Train the model
|
|
||||||
model.learn(total_timesteps=100000) # Adjust total_timesteps as needed
|
|
||||||
|
|
||||||
# Test the trained model
|
|
||||||
obs, info = env.reset()
|
obs, info = env.reset()
|
||||||
for _ in range(1000):
|
for _ in range(1000):
|
||||||
action, _states = model.predict(obs, deterministic=True)
|
action, _states = model.predict(obs, deterministic=True)
|
||||||
obs, reward, terminated, truncated, info = env.step(action)
|
obs, reward, terminated, truncated, info = env.step(action)
|
||||||
|
|
||||||
if terminated or truncated:
|
if terminated or truncated:
|
||||||
obs, info = env.reset()
|
obs, info = env.reset()
|
||||||
|
|
||||||
# Close the environment
|
|
||||||
env.close()
|
env.close()
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### NuconGoalEnv + HER Usage
|
||||||
|
|
||||||
|
HER works by relabelling past trajectories with the goal that was *actually achieved*, turning every episode into useful training signal even when the agent never reaches the intended target. This makes it much more sample-efficient than standard RL for goal-reaching tasks — important given how slow the real game is.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.rl import NuconGoalEnv
|
||||||
|
from stable_baselines3 import SAC
|
||||||
|
from stable_baselines3.common.buffers import HerReplayBuffer
|
||||||
|
|
||||||
|
env = NuconGoalEnv(
|
||||||
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
goal_range={
|
||||||
|
'GENERATOR_0_KW': (0.0, 1200.0),
|
||||||
|
'GENERATOR_1_KW': (0.0, 1200.0),
|
||||||
|
'GENERATOR_2_KW': (0.0, 1200.0),
|
||||||
|
},
|
||||||
|
tolerance=0.05, # sparse: within 5% of range counts as success (recommended with HER)
|
||||||
|
seconds_per_step=5,
|
||||||
|
simulator=simulator, # use a pre-trained simulator for fast pre-training
|
||||||
|
)
|
||||||
|
# Or use a preset: env = gym.make('Nucon-goal_power-v0', simulator=simulator)
|
||||||
|
|
||||||
|
model = SAC(
|
||||||
|
'MultiInputPolicy',
|
||||||
|
env,
|
||||||
|
replay_buffer_class=HerReplayBuffer,
|
||||||
|
replay_buffer_kwargs={'n_sampled_goal': 4, 'goal_selection_strategy': 'future'},
|
||||||
|
verbose=1,
|
||||||
|
learning_rate=1e-3,
|
||||||
|
batch_size=256,
|
||||||
|
tau=0.005,
|
||||||
|
gamma=0.98,
|
||||||
|
train_freq=1,
|
||||||
|
gradient_steps=1,
|
||||||
|
)
|
||||||
|
model.learn(total_timesteps=500_000)
|
||||||
|
```
|
||||||
|
|
||||||
|
At inference time, inject any target by constructing the observation manually:
|
||||||
|
```python
|
||||||
|
import numpy as np
|
||||||
|
obs, _ = env.reset()
|
||||||
|
# Override the desired goal (values are normalised to [0,1] within goal_range)
|
||||||
|
obs['desired_goal'] = np.array([0.8, 0.8, 0.8], dtype=np.float32) # ~960 kW per generator
|
||||||
|
action, _ = model.predict(obs, deterministic=True)
|
||||||
|
```
|
||||||
|
|
||||||
|
Predefined goal environments:
|
||||||
|
- `Nucon-goal_power-v0`: target total generator output (3 × 0–1200 kW)
|
||||||
|
- `Nucon-goal_temp-v0`: target core temperature (280–380 °C)
|
||||||
|
|
||||||
But theres a problem: RL algorithms require a huge amount of training steps to get passable policies, and Nucleares is a very slow simulation and can not be trivially parallelized. That's why NuCon also provides a
|
But theres a problem: RL algorithms require a huge amount of training steps to get passable policies, and Nucleares is a very slow simulation and can not be trivially parallelized. That's why NuCon also provides a
|
||||||
|
|
||||||
## Simulator (Work in Progress)
|
## Simulator (Work in Progress)
|
||||||
@ -265,9 +315,9 @@ pip install -e '.[model]'
|
|||||||
```python
|
```python
|
||||||
from nucon.model import NuconModelLearner
|
from nucon.model import NuconModelLearner
|
||||||
|
|
||||||
# --- Data collection ---
|
# --- Data collection (model_type not needed here) ---
|
||||||
learner = NuconModelLearner(
|
learner = NuconModelLearner(
|
||||||
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
|
time_delta=10.0, # 10 game-seconds per step (wall sleep auto-scales with sim speed)
|
||||||
include_valve_states=False, # set True to include all 53 valve positions as model inputs
|
include_valve_states=False, # set True to include all 53 valve positions as model inputs
|
||||||
)
|
)
|
||||||
learner.collect_data(num_steps=1000)
|
learner.collect_data(num_steps=1000)
|
||||||
@ -277,19 +327,19 @@ learner.save_dataset('reactor_dataset.pkl')
|
|||||||
learner.merge_datasets('other_session.pkl')
|
learner.merge_datasets('other_session.pkl')
|
||||||
|
|
||||||
# --- Neural network backend ---
|
# --- Neural network backend ---
|
||||||
nn_learner = NuconModelLearner(model_type='nn', dataset_path='reactor_dataset.pkl')
|
nn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
nn_learner.train_model(batch_size=32, num_epochs=50)
|
nn_learner.train_model(batch_size=32, num_epochs=50) # creates NN model on first call
|
||||||
# Drop samples the NN already predicts well (keep hard cases for further training)
|
# Drop samples the NN already predicts well (keep hard cases for further training)
|
||||||
nn_learner.drop_well_fitted(error_threshold=1.0)
|
nn_learner.drop_well_fitted(error_threshold=1.0)
|
||||||
nn_learner.save_model('reactor_nn.pth')
|
nn_learner.save_model('reactor_nn.pth')
|
||||||
|
|
||||||
# --- kNN + GP backend ---
|
# --- kNN + GP backend ---
|
||||||
knn_learner = NuconModelLearner(model_type='knn', knn_k=10, dataset_path='reactor_dataset.pkl')
|
knn_learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
# Drop near-duplicate samples before fitting (keeps diverse coverage).
|
# Drop near-duplicate samples before fitting (keeps diverse coverage).
|
||||||
# A sample is dropped only if BOTH its input state AND output transition
|
# A sample is dropped only if BOTH its input state AND output transition
|
||||||
# are within the given distances of an already-kept sample.
|
# are within the given distances of an already-kept sample.
|
||||||
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
knn_learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
||||||
knn_learner.fit_knn()
|
knn_learner.fit_knn(k=10) # creates kNN model on first call
|
||||||
|
|
||||||
# Point prediction
|
# Point prediction
|
||||||
state = knn_learner._get_state()
|
state = knn_learner._get_state()
|
||||||
@ -306,6 +356,165 @@ knn_learner.save_model('reactor_knn.pkl')
|
|||||||
|
|
||||||
The trained models can be integrated into the NuconSimulator to provide accurate dynamics based on real game data.
|
The trained models can be integrated into the NuconSimulator to provide accurate dynamics based on real game data.
|
||||||
|
|
||||||
|
## Full Training Loop
|
||||||
|
|
||||||
|
The recommended end-to-end workflow for training an RL operator is an iterative cycle of real-game data collection, model fitting, and simulated training. The real game is slow and cannot be parallelised, so the bulk of RL training happens in the simulator — the game is used only as an oracle for data and evaluation.
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────────────────────────────────────────────────┐
|
||||||
|
│ 1. Human dataset collection │
|
||||||
|
│ Play the game: start up the reactor, operate it across │
|
||||||
|
│ a range of states. NuCon records state transitions. │
|
||||||
|
└───────────────────────┬─────────────────────────────────────┘
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────────────────────────────────────────────────────────┐
|
||||||
|
│ 2. Initial model fitting │
|
||||||
|
│ Fit NN or kNN dynamics model to the collected dataset. │
|
||||||
|
│ kNN is instant; NN needs gradient steps but generalises │
|
||||||
|
│ better with more data. │
|
||||||
|
└───────────────────────┬─────────────────────────────────────┘
|
||||||
|
│
|
||||||
|
┌─────────▼──────────┐
|
||||||
|
│ 3. Train RL │◄───────────────────────┐
|
||||||
|
│ in simulator │ │
|
||||||
|
│ (fast, many │ │
|
||||||
|
│ trajectories) │ │
|
||||||
|
└─────────┬──────────┘ │
|
||||||
|
│ │
|
||||||
|
▼ │
|
||||||
|
┌─────────────────────┐ │
|
||||||
|
│ 4. Eval in game │ │
|
||||||
|
│ + collect new data │ │
|
||||||
|
│ (merge & prune │ │
|
||||||
|
│ dataset) │ │
|
||||||
|
└─────────┬───────────┘ │
|
||||||
|
│ │
|
||||||
|
▼ │
|
||||||
|
┌─────────────────────┐ model improved? │
|
||||||
|
│ 5. Refit model ├──────── yes ──────────┘
|
||||||
|
│ on expanded data │
|
||||||
|
└─────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 1 — Human dataset collection
|
||||||
|
|
||||||
|
Start `NuconModelLearner` before or during your play session. Try to cover a wide range of reactor states — startup from cold, ramping power up and down, adjusting individual rod banks, pump speed changes. Diversity in the dataset directly determines how accurate the simulator will be.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.model import NuconModelLearner
|
||||||
|
|
||||||
|
learner = NuconModelLearner(
|
||||||
|
dataset_path='reactor_dataset.pkl',
|
||||||
|
time_delta=10.0, # 10 game-seconds per sample
|
||||||
|
)
|
||||||
|
learner.collect_data(num_steps=500, save_every=10)
|
||||||
|
```
|
||||||
|
|
||||||
|
The collector saves every 10 steps, retries automatically on game crashes, and scales wall-clock sleep with `GAME_SIM_SPEED` so samples are always 10 game-seconds apart regardless of simulation speed.
|
||||||
|
|
||||||
|
### Step 2 — Initial model fitting
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.model import NuconModelLearner
|
||||||
|
|
||||||
|
learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
|
|
||||||
|
# Option A: kNN + GP (instant fit, built-in uncertainty estimation)
|
||||||
|
learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
||||||
|
learner.fit_knn(k=10)
|
||||||
|
learner.save_model('reactor_knn.pkl')
|
||||||
|
|
||||||
|
# Option B: Neural network (better extrapolation with larger datasets)
|
||||||
|
learner.train_model(batch_size=32, num_epochs=50)
|
||||||
|
learner.drop_well_fitted(error_threshold=1.0) # keep hard samples for next round
|
||||||
|
learner.save_model('reactor_nn.pth')
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 3 — Train RL in simulator
|
||||||
|
|
||||||
|
Load the fitted model into the simulator and train with SAC + HER. The simulator runs orders of magnitude faster than the real game, allowing millions of steps in reasonable time.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.sim import NuconSimulator, OperatingState
|
||||||
|
from nucon.rl import NuconGoalEnv
|
||||||
|
from stable_baselines3 import SAC
|
||||||
|
from stable_baselines3.common.buffers import HerReplayBuffer
|
||||||
|
|
||||||
|
simulator = NuconSimulator()
|
||||||
|
simulator.load_model('reactor_knn.pkl')
|
||||||
|
simulator.set_state(OperatingState.NOMINAL)
|
||||||
|
|
||||||
|
env = NuconGoalEnv(
|
||||||
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
||||||
|
tolerance=0.05,
|
||||||
|
simulator=simulator,
|
||||||
|
seconds_per_step=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
model = SAC(
|
||||||
|
'MultiInputPolicy', env,
|
||||||
|
replay_buffer_class=HerReplayBuffer,
|
||||||
|
replay_buffer_kwargs={'n_sampled_goal': 4, 'goal_selection_strategy': 'future'},
|
||||||
|
verbose=1,
|
||||||
|
)
|
||||||
|
model.learn(total_timesteps=500_000)
|
||||||
|
model.save('rl_policy.zip')
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 4 — Eval in game + collect new data
|
||||||
|
|
||||||
|
Run the trained policy against the real game. This validates whether the simulator was accurate enough, and simultaneously collects new data covering states the policy visits — which may be regions the original dataset missed.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from nucon.rl import NuconGoalEnv
|
||||||
|
from nucon.model import NuconModelLearner
|
||||||
|
from stable_baselines3 import SAC
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
# Load policy and run in real game
|
||||||
|
env = NuconGoalEnv(
|
||||||
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
||||||
|
seconds_per_step=10,
|
||||||
|
)
|
||||||
|
policy = SAC.load('rl_policy.zip')
|
||||||
|
|
||||||
|
# Simultaneously collect new data
|
||||||
|
new_data_learner = NuconModelLearner(
|
||||||
|
dataset_path='reactor_dataset_new.pkl',
|
||||||
|
time_delta=10.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
obs, _ = env.reset()
|
||||||
|
for _ in range(200):
|
||||||
|
action, _ = policy.predict(obs, deterministic=True)
|
||||||
|
obs, reward, terminated, truncated, _ = env.step(action)
|
||||||
|
if terminated or truncated:
|
||||||
|
obs, _ = env.reset()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 5 — Refit model on expanded data
|
||||||
|
|
||||||
|
Merge the new data into the original dataset and refit:
|
||||||
|
|
||||||
|
```python
|
||||||
|
learner = NuconModelLearner(dataset_path='reactor_dataset.pkl')
|
||||||
|
learner.merge_datasets('reactor_dataset_new.pkl')
|
||||||
|
|
||||||
|
# Prune redundant samples before refitting
|
||||||
|
learner.drop_redundant(min_state_distance=0.1, min_output_distance=0.05)
|
||||||
|
print(f"Dataset size after pruning: {len(learner.dataset)}")
|
||||||
|
|
||||||
|
learner.fit_knn(k=10)
|
||||||
|
learner.save_model('reactor_knn.pkl')
|
||||||
|
```
|
||||||
|
|
||||||
|
Then go back to Step 3 with the improved model. Each iteration the simulator gets more accurate, the policy gets better, and the new data collection explores increasingly interesting regions of state space.
|
||||||
|
|
||||||
|
**When to stop**: when the policy performs well in the real game and the kNN uncertainty stays low throughout an episode (indicating the policy stays within the known data distribution).
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
NuCon includes a test suite to verify its functionality and compatibility with the Nucleares game.
|
NuCon includes a test suite to verify its functionality and compatibility with the Nucleares game.
|
||||||
|
|||||||
112
nucon/model.py
112
nucon/model.py
@ -152,21 +152,31 @@ class ReactorKNNModel:
|
|||||||
class NuconModelLearner:
|
class NuconModelLearner:
|
||||||
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl',
|
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl',
|
||||||
time_delta: Union[float, Tuple[float, float]] = 1.0,
|
time_delta: Union[float, Tuple[float, float]] = 1.0,
|
||||||
model_type: str = 'nn', knn_k: int = 5,
|
|
||||||
include_valve_states: bool = False):
|
include_valve_states: bool = False):
|
||||||
self.nucon = Nucon() if nucon is None else nucon
|
self.nucon = Nucon() if nucon is None else nucon
|
||||||
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
|
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
|
||||||
self.dataset = self.load_dataset(dataset_path) or []
|
self.dataset = self.load_dataset(dataset_path) or []
|
||||||
self.dataset_path = dataset_path
|
self.dataset_path = dataset_path
|
||||||
self.include_valve_states = include_valve_states
|
self.include_valve_states = include_valve_states
|
||||||
|
self.model = None
|
||||||
|
self.optimizer = None
|
||||||
|
|
||||||
# Exclude params with no physics signal
|
# Exclude params with no physics signal
|
||||||
_JUNK_PARAMS = frozenset({'GAME_VERSION', 'TIME', 'TIME_STAMP', 'TIME_DAY',
|
_JUNK_PARAMS = frozenset({'GAME_VERSION', 'TIME', 'TIME_STAMP', 'TIME_DAY',
|
||||||
'ALARMS_ACTIVE', 'FUN_IS_ENABLED', 'GAME_SIM_SPEED'})
|
'ALARMS_ACTIVE', 'FUN_IS_ENABLED', 'GAME_SIM_SPEED'})
|
||||||
candidate_params = {k: p for k, p in self.nucon.get_all_readable().items()
|
candidate_params = {k: p for k, p in self.nucon.get_all_readable().items()
|
||||||
if k not in _JUNK_PARAMS and p.param_type != str}
|
if k not in _JUNK_PARAMS and p.param_type != str}
|
||||||
# Filter out params that return None (subsystem not installed)
|
# Filter out params that return None (subsystem not installed).
|
||||||
test_state = {k: self.nucon.get(k) for k in candidate_params}
|
# Retry until the game is reachable.
|
||||||
|
import requests as _requests
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
test_state = {k: self.nucon.get(k) for k in candidate_params}
|
||||||
|
break
|
||||||
|
except (_requests.exceptions.ConnectionError,
|
||||||
|
_requests.exceptions.Timeout):
|
||||||
|
print("Waiting for game to be reachable…")
|
||||||
|
time.sleep(5)
|
||||||
self.readable_params = [k for k in candidate_params if test_state[k] is not None]
|
self.readable_params = [k for k in candidate_params if test_state[k] is not None]
|
||||||
self.non_writable_params = [k for k in self.readable_params
|
self.non_writable_params = [k for k in self.readable_params
|
||||||
if not self.nucon.get_all_readable()[k].is_writable]
|
if not self.nucon.get_all_readable()[k].is_writable]
|
||||||
@ -179,15 +189,6 @@ class NuconModelLearner:
|
|||||||
self.readable_params = self.readable_params + self.valve_keys
|
self.readable_params = self.readable_params + self.valve_keys
|
||||||
# valve positions are input-only (not predicted as outputs)
|
# valve positions are input-only (not predicted as outputs)
|
||||||
|
|
||||||
if model_type == 'nn':
|
|
||||||
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
|
||||||
self.optimizer = optim.Adam(self.model.parameters())
|
|
||||||
elif model_type == 'knn':
|
|
||||||
self.model = ReactorKNNModel(self.readable_params, self.non_writable_params, k=knn_k)
|
|
||||||
self.optimizer = None
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unknown model_type '{model_type}'. Use 'nn' or 'knn'.")
|
|
||||||
|
|
||||||
if isinstance(time_delta, (int, float)):
|
if isinstance(time_delta, (int, float)):
|
||||||
self.time_delta = lambda: time_delta
|
self.time_delta = lambda: time_delta
|
||||||
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
|
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
|
||||||
@ -211,33 +212,64 @@ class NuconModelLearner:
|
|||||||
state[key] = valves.get(name, {}).get('Value', 0.0)
|
state[key] = valves.get(name, {}).get('Value', 0.0)
|
||||||
return state
|
return state
|
||||||
|
|
||||||
def collect_data(self, num_steps):
|
def collect_data(self, num_steps, save_every=10):
|
||||||
"""
|
"""
|
||||||
Collect state-transition tuples from the live game.
|
Collect state-transition tuples from the live game.
|
||||||
|
|
||||||
Sleeps wall_time = target_game_delta / sim_speed so that each stored
|
Sleeps wall_time = target_game_delta / sim_speed so that each stored
|
||||||
game_delta is uniform regardless of the game's simulation speed setting.
|
game_delta is uniform regardless of the game's simulation speed setting.
|
||||||
|
|
||||||
|
Saves the dataset every ``save_every`` steps so a crash doesn't lose
|
||||||
|
everything. On a connection error the step is skipped and collection
|
||||||
|
resumes once the game is reachable again (retries every 5 s).
|
||||||
"""
|
"""
|
||||||
state = self._get_state()
|
import requests as _requests
|
||||||
for _ in range(num_steps):
|
|
||||||
|
def get_state_with_retry():
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return self._get_state()
|
||||||
|
except (_requests.exceptions.ConnectionError,
|
||||||
|
_requests.exceptions.Timeout) as e:
|
||||||
|
print(f"Connection lost ({e}). Retrying in 5 s…")
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
state = get_state_with_retry()
|
||||||
|
collected = 0
|
||||||
|
for i in range(num_steps):
|
||||||
action = self.actor(state)
|
action = self.actor(state)
|
||||||
for param_id, value in action.items():
|
for param_id, value in action.items():
|
||||||
self.nucon.set(param_id, value)
|
try:
|
||||||
|
self.nucon.set(param_id, value)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
target_game_delta = self.time_delta()
|
target_game_delta = self.time_delta()
|
||||||
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
try:
|
||||||
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
||||||
|
except Exception:
|
||||||
|
sim_speed = 1.0
|
||||||
time.sleep(target_game_delta / sim_speed)
|
time.sleep(target_game_delta / sim_speed)
|
||||||
next_state = self._get_state()
|
|
||||||
|
|
||||||
|
next_state = get_state_with_retry()
|
||||||
self.dataset.append((state, action, next_state, target_game_delta))
|
self.dataset.append((state, action, next_state, target_game_delta))
|
||||||
state = next_state
|
state = next_state
|
||||||
|
collected += 1
|
||||||
|
|
||||||
|
if collected % save_every == 0:
|
||||||
|
self.save_dataset()
|
||||||
|
print(f" {collected}/{num_steps} steps collected, dataset saved.")
|
||||||
|
|
||||||
self.save_dataset()
|
self.save_dataset()
|
||||||
|
print(f"Collection complete. {collected} steps, {len(self.dataset)} total samples.")
|
||||||
|
|
||||||
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
|
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
|
||||||
"""Train the NN model. For kNN, call fit_knn() instead."""
|
"""Train a neural-network dynamics model on the current dataset."""
|
||||||
if not isinstance(self.model, ReactorDynamicsModel):
|
if self.model is None:
|
||||||
raise ValueError("train_model() is for the NN model. Use fit_knn() for kNN.")
|
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
||||||
|
self.optimizer = optim.Adam(self.model.parameters())
|
||||||
|
elif not isinstance(self.model, ReactorDynamicsModel):
|
||||||
|
raise ValueError("A kNN model is already loaded. Create a new learner to train an NN.")
|
||||||
random.shuffle(self.dataset)
|
random.shuffle(self.dataset)
|
||||||
split_idx = int(len(self.dataset) * (1 - test_split))
|
split_idx = int(len(self.dataset) * (1 - test_split))
|
||||||
train_data = self.dataset[:split_idx]
|
train_data = self.dataset[:split_idx]
|
||||||
@ -247,17 +279,19 @@ class NuconModelLearner:
|
|||||||
test_loss = self._test_epoch(test_data)
|
test_loss = self._test_epoch(test_data)
|
||||||
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
|
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
|
||||||
|
|
||||||
def fit_knn(self):
|
def fit_knn(self, k: int = 5):
|
||||||
"""Fit the kNN/GP model from the current dataset (instantaneous, no gradient steps)."""
|
"""Fit a kNN/GP dynamics model from the current dataset (instantaneous, no gradient steps)."""
|
||||||
if not isinstance(self.model, ReactorKNNModel):
|
if self.model is None:
|
||||||
raise ValueError("fit_knn() is for the kNN model. Use train_model() for NN.")
|
self.model = ReactorKNNModel(self.readable_params, self.non_writable_params, k=k)
|
||||||
|
elif not isinstance(self.model, ReactorKNNModel):
|
||||||
|
raise ValueError("An NN model is already loaded. Create a new learner to fit a kNN.")
|
||||||
self.model.fit(self.dataset)
|
self.model.fit(self.dataset)
|
||||||
print(f"kNN model fitted on {len(self.dataset)} samples.")
|
print(f"kNN model fitted on {len(self.dataset)} samples.")
|
||||||
|
|
||||||
def predict_with_uncertainty(self, state_dict: Dict, time_delta: float):
|
def predict_with_uncertainty(self, state_dict: Dict, time_delta: float):
|
||||||
"""Return (prediction_dict, uncertainty_std). Only available for kNN model."""
|
"""Return (prediction_dict, uncertainty_std). Only available after fit_knn()."""
|
||||||
if not isinstance(self.model, ReactorKNNModel):
|
if not isinstance(self.model, ReactorKNNModel):
|
||||||
raise ValueError("predict_with_uncertainty() requires model_type='knn'.")
|
raise ValueError("predict_with_uncertainty() requires a fitted kNN model (call fit_knn()).")
|
||||||
return self.model.forward_with_uncertainty(state_dict, time_delta)
|
return self.model.forward_with_uncertainty(state_dict, time_delta)
|
||||||
|
|
||||||
def drop_well_fitted(self, error_threshold: float):
|
def drop_well_fitted(self, error_threshold: float):
|
||||||
@ -266,6 +300,8 @@ class NuconModelLearner:
|
|||||||
Keeps only hard/surprising transitions. Useful for NN training to focus
|
Keeps only hard/surprising transitions. Useful for NN training to focus
|
||||||
capacity on difficult regions of state space.
|
capacity on difficult regions of state space.
|
||||||
"""
|
"""
|
||||||
|
if self.model is None:
|
||||||
|
raise ValueError("No model fitted yet. Call train_model() or fit_knn() first.")
|
||||||
kept = []
|
kept = []
|
||||||
for state, action, next_state, time_delta in self.dataset:
|
for state, action, next_state, time_delta in self.dataset:
|
||||||
pred = self.model.forward(state, time_delta)
|
pred = self.model.forward(state, time_delta)
|
||||||
@ -359,18 +395,32 @@ class NuconModelLearner:
|
|||||||
return total_loss / len(data)
|
return total_loss / len(data)
|
||||||
|
|
||||||
def save_model(self, path):
|
def save_model(self, path):
|
||||||
|
if self.model is None:
|
||||||
|
raise ValueError("No model to save. Call train_model() or fit_knn() first.")
|
||||||
if isinstance(self.model, ReactorDynamicsModel):
|
if isinstance(self.model, ReactorDynamicsModel):
|
||||||
torch.save(self.model.state_dict(), path)
|
torch.save({
|
||||||
|
'state_dict': self.model.state_dict(),
|
||||||
|
'input_params': self.model.input_params,
|
||||||
|
'output_params': self.model.output_params,
|
||||||
|
}, path)
|
||||||
else:
|
else:
|
||||||
with open(path, 'wb') as f:
|
with open(path, 'wb') as f:
|
||||||
pickle.dump(self.model, f)
|
pickle.dump(self.model, f)
|
||||||
|
|
||||||
def load_model(self, path):
|
def load_model(self, path):
|
||||||
if isinstance(self.model, ReactorDynamicsModel):
|
if path.endswith('.pkl'):
|
||||||
self.model.load_state_dict(torch.load(path))
|
|
||||||
else:
|
|
||||||
with open(path, 'rb') as f:
|
with open(path, 'rb') as f:
|
||||||
self.model = pickle.load(f)
|
self.model = pickle.load(f)
|
||||||
|
else:
|
||||||
|
checkpoint = torch.load(path, weights_only=False)
|
||||||
|
if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
|
||||||
|
m = ReactorDynamicsModel(checkpoint['input_params'], checkpoint['output_params'])
|
||||||
|
m.load_state_dict(checkpoint['state_dict'])
|
||||||
|
self.model = m
|
||||||
|
else:
|
||||||
|
# legacy plain state dict
|
||||||
|
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
||||||
|
self.model.load_state_dict(checkpoint)
|
||||||
|
|
||||||
def save_dataset(self, path=None):
|
def save_dataset(self, path=None):
|
||||||
path = path or self.dataset_path
|
path = path or self.dataset_path
|
||||||
|
|||||||
281
nucon/rl.py
281
nucon/rl.py
@ -3,6 +3,7 @@ from gymnasium import spaces
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import time
|
import time
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
from enum import Enum
|
||||||
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
|
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
|
||||||
|
|
||||||
Objectives = {
|
Objectives = {
|
||||||
@ -43,39 +44,19 @@ class NuconEnv(gym.Env):
|
|||||||
# Define observation space
|
# Define observation space
|
||||||
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
||||||
for param_id, param in self.nucon.get_all_readable().items():
|
for param_id, param in self.nucon.get_all_readable().items():
|
||||||
if param.param_type == float:
|
sp = _build_param_space(param)
|
||||||
obs_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
|
if sp is not None:
|
||||||
elif param.param_type == int:
|
obs_spaces[param_id] = sp
|
||||||
if param.min_val is not None and param.max_val is not None:
|
|
||||||
obs_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
|
|
||||||
else:
|
|
||||||
obs_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
|
|
||||||
elif param.param_type == bool:
|
|
||||||
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
|
|
||||||
elif issubclass(param.param_type, Enum):
|
|
||||||
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unsupported observation parameter type: {param.param_type}")
|
|
||||||
|
|
||||||
self.observation_space = spaces.Dict(obs_spaces)
|
self.observation_space = spaces.Dict(obs_spaces)
|
||||||
|
|
||||||
# Define action space
|
# Define action space (only controllable, non-cheat, readable-back params)
|
||||||
action_spaces = {}
|
action_spaces = {}
|
||||||
for param_id, param in self.nucon.get_all_writable().items():
|
for param_id, param in self.nucon.get_all_writable().items():
|
||||||
if param.param_type == float:
|
if not param.is_readable or param.is_cheat:
|
||||||
action_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
|
continue # write-only (VALVE_OPEN/CLOSE, SCRAM, etc.) and cheat params excluded
|
||||||
elif param.param_type == int:
|
sp = _build_param_space(param)
|
||||||
if param.min_val is not None and param.max_val is not None:
|
if sp is not None:
|
||||||
action_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
|
action_spaces[param_id] = sp
|
||||||
else:
|
|
||||||
action_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
|
|
||||||
elif param.param_type == bool:
|
|
||||||
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
|
|
||||||
elif issubclass(param.param_type, Enum):
|
|
||||||
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unsupported action parameter type: {param.param_type}")
|
|
||||||
|
|
||||||
self.action_space = spaces.Dict(action_spaces)
|
self.action_space = spaces.Dict(action_spaces)
|
||||||
|
|
||||||
self.objectives = []
|
self.objectives = []
|
||||||
@ -100,6 +81,8 @@ class NuconEnv(gym.Env):
|
|||||||
def _get_obs(self):
|
def _get_obs(self):
|
||||||
obs = {}
|
obs = {}
|
||||||
for param_id, param in self.nucon.get_all_readable().items():
|
for param_id, param in self.nucon.get_all_readable().items():
|
||||||
|
if param.param_type == str or param_id not in self.observation_space.spaces:
|
||||||
|
continue
|
||||||
value = self.nucon.get(param_id)
|
value = self.nucon.get(param_id)
|
||||||
if isinstance(value, Enum):
|
if isinstance(value, Enum):
|
||||||
value = value.value
|
value = value.value
|
||||||
@ -127,9 +110,11 @@ class NuconEnv(gym.Env):
|
|||||||
def step(self, action):
|
def step(self, action):
|
||||||
# Apply the action to the Nucon system
|
# Apply the action to the Nucon system
|
||||||
for param_id, value in action.items():
|
for param_id, value in action.items():
|
||||||
param = next(p for p in self.nucon if p.id == param_id)
|
param = self.nucon._parameters[param_id]
|
||||||
if issubclass(param.param_type, Enum):
|
if issubclass(param.param_type, Enum):
|
||||||
value = param.param_type(value)
|
value = param.param_type(int(np.asarray(value).flat[0]))
|
||||||
|
else:
|
||||||
|
value = param.param_type(np.asarray(value).flat[0])
|
||||||
if param.min_val is not None and param.max_val is not None:
|
if param.min_val is not None and param.max_val is not None:
|
||||||
value = np.clip(value, param.min_val, param.max_val)
|
value = np.clip(value, param.min_val, param.max_val)
|
||||||
self.nucon.set(param, value)
|
self.nucon.set(param, value)
|
||||||
@ -144,7 +129,10 @@ class NuconEnv(gym.Env):
|
|||||||
if self.simulator:
|
if self.simulator:
|
||||||
self.simulator.update(self.seconds_per_step)
|
self.simulator.update(self.seconds_per_step)
|
||||||
else:
|
else:
|
||||||
time.sleep(self.seconds_per_step)
|
# Sleep to let the game advance seconds_per_step game-seconds,
|
||||||
|
# accounting for the game's simulation speed multiplier.
|
||||||
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
||||||
|
time.sleep(self.seconds_per_step / sim_speed)
|
||||||
return observation, reward, terminated, truncated, info
|
return observation, reward, terminated, truncated, info
|
||||||
|
|
||||||
def render(self):
|
def render(self):
|
||||||
@ -167,6 +155,215 @@ class NuconEnv(gym.Env):
|
|||||||
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
|
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
|
||||||
|
|
||||||
|
|
||||||
|
def _build_param_space(param):
|
||||||
|
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
|
||||||
|
if param.param_type == float:
|
||||||
|
return spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
|
||||||
|
elif param.param_type == int:
|
||||||
|
lo = param.min_val if param.min_val is not None else -np.inf
|
||||||
|
hi = param.max_val if param.max_val is not None else np.inf
|
||||||
|
return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32)
|
||||||
|
elif param.param_type == bool:
|
||||||
|
return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
|
||||||
|
elif param.param_type == str:
|
||||||
|
return None
|
||||||
|
elif issubclass(param.param_type, Enum):
|
||||||
|
return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class NuconGoalEnv(gym.Env):
|
||||||
|
"""
|
||||||
|
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
|
||||||
|
|
||||||
|
The observation is a Dict with three keys as required by GoalEnv / HER:
|
||||||
|
- 'observation': all readable non-goal, non-str params (same encoding as NuconEnv)
|
||||||
|
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
|
||||||
|
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
|
||||||
|
|
||||||
|
Reward defaults to negative L2 distance in the normalised goal space (dense).
|
||||||
|
Pass ``tolerance`` for a sparse {0, -1} reward (0 = within tolerance).
|
||||||
|
|
||||||
|
Usage with SB3 HER::
|
||||||
|
|
||||||
|
from stable_baselines3 import SAC
|
||||||
|
from stable_baselines3.common.buffers import HerReplayBuffer
|
||||||
|
|
||||||
|
env = NuconGoalEnv(
|
||||||
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
||||||
|
simulator=simulator,
|
||||||
|
)
|
||||||
|
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
|
||||||
|
model.learn(total_timesteps=200_000)
|
||||||
|
"""
|
||||||
|
|
||||||
|
metadata = {'render_modes': ['human']}
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
goal_params,
|
||||||
|
goal_range=None,
|
||||||
|
reward_fn=None,
|
||||||
|
tolerance=None,
|
||||||
|
nucon=None,
|
||||||
|
simulator=None,
|
||||||
|
render_mode=None,
|
||||||
|
seconds_per_step=5,
|
||||||
|
terminators=None,
|
||||||
|
terminate_above=0,
|
||||||
|
):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self.render_mode = render_mode
|
||||||
|
self.seconds_per_step = seconds_per_step
|
||||||
|
self.terminate_above = terminate_above
|
||||||
|
self.simulator = simulator
|
||||||
|
self.goal_params = list(goal_params)
|
||||||
|
self.tolerance = tolerance
|
||||||
|
|
||||||
|
if nucon is None:
|
||||||
|
nucon = Nucon(port=simulator.port) if simulator else Nucon()
|
||||||
|
self.nucon = nucon
|
||||||
|
|
||||||
|
all_readable = self.nucon.get_all_readable()
|
||||||
|
|
||||||
|
# Validate goal params and build per-param range arrays
|
||||||
|
for pid in self.goal_params:
|
||||||
|
if pid not in all_readable:
|
||||||
|
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
|
||||||
|
|
||||||
|
goal_range = goal_range or {}
|
||||||
|
self._goal_low = np.array([
|
||||||
|
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
|
||||||
|
for pid in self.goal_params
|
||||||
|
], dtype=np.float32)
|
||||||
|
self._goal_high = np.array([
|
||||||
|
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1]
|
||||||
|
for pid in self.goal_params
|
||||||
|
], dtype=np.float32)
|
||||||
|
self._goal_range = self._goal_high - self._goal_low
|
||||||
|
self._goal_range[self._goal_range == 0] = 1.0 # avoid div-by-zero
|
||||||
|
|
||||||
|
self._reward_fn = reward_fn # callable(achieved_norm, desired_norm) -> float, or None
|
||||||
|
|
||||||
|
# Observation subspace: all readable non-str non-goal params
|
||||||
|
goal_set = set(self.goal_params)
|
||||||
|
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
||||||
|
for param_id, param in all_readable.items():
|
||||||
|
if param_id in goal_set:
|
||||||
|
continue
|
||||||
|
sp = _build_param_space(param)
|
||||||
|
if sp is not None:
|
||||||
|
obs_spaces[param_id] = sp
|
||||||
|
|
||||||
|
n_goals = len(self.goal_params)
|
||||||
|
self.observation_space = spaces.Dict({
|
||||||
|
'observation': spaces.Dict(obs_spaces),
|
||||||
|
'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
|
||||||
|
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
|
||||||
|
})
|
||||||
|
|
||||||
|
# Action space: readable-back, non-cheat writable params
|
||||||
|
action_spaces = {}
|
||||||
|
for param_id, param in self.nucon.get_all_writable().items():
|
||||||
|
if not param.is_readable or param.is_cheat:
|
||||||
|
continue
|
||||||
|
sp = _build_param_space(param)
|
||||||
|
if sp is not None:
|
||||||
|
action_spaces[param_id] = sp
|
||||||
|
self.action_space = spaces.Dict(action_spaces)
|
||||||
|
|
||||||
|
# Terminators
|
||||||
|
self._terminators = terminators or []
|
||||||
|
|
||||||
|
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
|
||||||
|
self._total_steps = 0
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# GoalEnv interface
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def compute_reward(self, achieved_goal, desired_goal, info):
|
||||||
|
"""
|
||||||
|
Dense: negative L2 in normalised goal space (each dim in [0,1]).
|
||||||
|
Sparse when tolerance is set: 0 if within tolerance, -1 otherwise.
|
||||||
|
Custom reward_fn overrides both.
|
||||||
|
"""
|
||||||
|
if self._reward_fn is not None:
|
||||||
|
return self._reward_fn(achieved_goal, desired_goal)
|
||||||
|
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
|
||||||
|
if self.tolerance is not None:
|
||||||
|
return (dist <= self.tolerance).astype(np.float32) - 1.0
|
||||||
|
return -dist
|
||||||
|
|
||||||
|
def _read_goal_values(self):
|
||||||
|
raw = np.array([
|
||||||
|
self.nucon.get(pid) or 0.0 for pid in self.goal_params
|
||||||
|
], dtype=np.float32)
|
||||||
|
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
|
||||||
|
|
||||||
|
def _get_obs_dict(self):
|
||||||
|
obs = {'EPISODE_TIME': float(self._total_steps * self.seconds_per_step)}
|
||||||
|
goal_set = set(self.goal_params)
|
||||||
|
for param_id, param in self.nucon.get_all_readable().items():
|
||||||
|
if param_id in goal_set or param_id not in self.observation_space['observation'].spaces:
|
||||||
|
continue
|
||||||
|
value = self.nucon.get(param_id)
|
||||||
|
if isinstance(value, Enum):
|
||||||
|
value = value.value
|
||||||
|
obs[param_id] = value
|
||||||
|
achieved = self._read_goal_values()
|
||||||
|
return {
|
||||||
|
'observation': obs,
|
||||||
|
'achieved_goal': achieved,
|
||||||
|
'desired_goal': self._desired_goal.copy(),
|
||||||
|
}
|
||||||
|
|
||||||
|
def reset(self, seed=None, options=None):
|
||||||
|
super().reset(seed=seed)
|
||||||
|
self._total_steps = 0
|
||||||
|
|
||||||
|
# Sample a new goal uniformly from the goal range
|
||||||
|
rng = np.random.default_rng(seed)
|
||||||
|
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
|
||||||
|
|
||||||
|
obs = self._get_obs_dict()
|
||||||
|
return obs, {}
|
||||||
|
|
||||||
|
def step(self, action):
|
||||||
|
for param_id, value in action.items():
|
||||||
|
param = self.nucon._parameters[param_id]
|
||||||
|
if issubclass(param.param_type, Enum):
|
||||||
|
value = param.param_type(int(np.asarray(value).flat[0]))
|
||||||
|
else:
|
||||||
|
value = param.param_type(np.asarray(value).flat[0])
|
||||||
|
if param.min_val is not None and param.max_val is not None:
|
||||||
|
value = np.clip(value, param.min_val, param.max_val)
|
||||||
|
self.nucon.set(param, value)
|
||||||
|
|
||||||
|
obs = self._get_obs_dict()
|
||||||
|
reward = float(self.compute_reward(obs['achieved_goal'], obs['desired_goal'], {}))
|
||||||
|
terminated = any(t(obs['observation']) > self.terminate_above for t in self._terminators)
|
||||||
|
truncated = False
|
||||||
|
info = {'achieved_goal': obs['achieved_goal'], 'desired_goal': obs['desired_goal']}
|
||||||
|
|
||||||
|
self._total_steps += 1
|
||||||
|
if self.simulator:
|
||||||
|
self.simulator.update(self.seconds_per_step)
|
||||||
|
else:
|
||||||
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
||||||
|
time.sleep(self.seconds_per_step / sim_speed)
|
||||||
|
|
||||||
|
return obs, reward, terminated, truncated, info
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def register_nucon_envs():
|
def register_nucon_envs():
|
||||||
gym.register(
|
gym.register(
|
||||||
id='Nucon-max_power-v0',
|
id='Nucon-max_power-v0',
|
||||||
@ -183,5 +380,25 @@ def register_nucon_envs():
|
|||||||
entry_point='nucon.rl:NuconEnv',
|
entry_point='nucon.rl:NuconEnv',
|
||||||
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
|
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
|
||||||
)
|
)
|
||||||
|
# Goal-conditioned: target total generator output (train with HER)
|
||||||
|
gym.register(
|
||||||
|
id='Nucon-goal_power-v0',
|
||||||
|
entry_point='nucon.rl:NuconGoalEnv',
|
||||||
|
kwargs={
|
||||||
|
'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
||||||
|
'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)},
|
||||||
|
'seconds_per_step': 5,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# Goal-conditioned: target core temperature (train with HER)
|
||||||
|
gym.register(
|
||||||
|
id='Nucon-goal_temp-v0',
|
||||||
|
entry_point='nucon.rl:NuconGoalEnv',
|
||||||
|
kwargs={
|
||||||
|
'goal_params': ['CORE_TEMP'],
|
||||||
|
'goal_range': {'CORE_TEMP': (280.0, 380.0)},
|
||||||
|
'seconds_per_step': 5,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
register_nucon_envs()
|
register_nucon_envs()
|
||||||
62
nucon/sim.py
62
nucon/sim.py
@ -5,7 +5,8 @@ from flask import Flask, request, jsonify
|
|||||||
from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
|
from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
|
||||||
import threading
|
import threading
|
||||||
import torch
|
import torch
|
||||||
from nucon.model import ReactorDynamicsModel
|
from nucon.model import ReactorDynamicsModel, ReactorKNNModel
|
||||||
|
import pickle
|
||||||
|
|
||||||
class OperatingState(Enum):
|
class OperatingState(Enum):
|
||||||
# Tuple indicates a range of values, while list indicates a set of possible values
|
# Tuple indicates a range of values, while list indicates a set of possible values
|
||||||
@ -165,6 +166,8 @@ class NuconSimulator:
|
|||||||
def __init__(self, host: str = 'localhost', port: int = 8786):
|
def __init__(self, host: str = 'localhost', port: int = 8786):
|
||||||
self._nucon = Nucon()
|
self._nucon = Nucon()
|
||||||
self.parameters = self.Parameters(self._nucon)
|
self.parameters = self.Parameters(self._nucon)
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
self.time = 0.0
|
self.time = 0.0
|
||||||
self.allow_all_writes = False
|
self.allow_all_writes = False
|
||||||
self.set_state(OperatingState.OFFLINE)
|
self.set_state(OperatingState.OFFLINE)
|
||||||
@ -216,34 +219,63 @@ class NuconSimulator:
|
|||||||
self._update_reactor_state(time_step)
|
self._update_reactor_state(time_step)
|
||||||
self.time += time_step
|
self.time += time_step
|
||||||
|
|
||||||
|
def set_model(self, model) -> None:
|
||||||
|
"""Set a pre-loaded ReactorDynamicsModel or ReactorKNNModel directly."""
|
||||||
|
self.model = model
|
||||||
|
if isinstance(model, ReactorDynamicsModel):
|
||||||
|
self.model.eval()
|
||||||
|
|
||||||
def load_model(self, model_path: str) -> None:
|
def load_model(self, model_path: str) -> None:
|
||||||
|
"""Load a model from a file. .pkl → ReactorKNNModel, otherwise → ReactorDynamicsModel (torch)."""
|
||||||
try:
|
try:
|
||||||
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
if model_path.endswith('.pkl'):
|
||||||
self.model.load_state_dict(torch.load(model_path))
|
with open(model_path, 'rb') as f:
|
||||||
self.model.eval() # Set the model to evaluation mode
|
self.model = pickle.load(f)
|
||||||
print(f"Model loaded successfully from {model_path}")
|
print(f"kNN model loaded from {model_path}")
|
||||||
|
else:
|
||||||
|
# Reconstruct shell from the saved state dict; input/output params
|
||||||
|
# are stored inside the checkpoint.
|
||||||
|
checkpoint = torch.load(model_path, weights_only=False)
|
||||||
|
if isinstance(checkpoint, dict) and 'input_params' in checkpoint:
|
||||||
|
self.model = ReactorDynamicsModel(checkpoint['input_params'], checkpoint['output_params'])
|
||||||
|
self.model.load_state_dict(checkpoint['state_dict'])
|
||||||
|
else:
|
||||||
|
# Legacy: plain state dict — fall back using sim readable/non-writable lists
|
||||||
|
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
|
||||||
|
self.model.load_state_dict(checkpoint)
|
||||||
|
self.model.eval()
|
||||||
|
print(f"NN model loaded from {model_path}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading model: {str(e)}")
|
print(f"Error loading model: {str(e)}")
|
||||||
self.model = None
|
self.model = None
|
||||||
|
|
||||||
def _update_reactor_state(self, time_step: float) -> None:
|
def _update_reactor_state(self, time_step: float) -> None:
|
||||||
if not self.model:
|
if not self.model:
|
||||||
raise ValueError("Model not set. Please load a model using load_model() method.")
|
raise ValueError("Model not set. Please load a model using load_model() or set_model().")
|
||||||
|
|
||||||
|
# Build state dict using only the params the model knows about
|
||||||
state = {}
|
state = {}
|
||||||
for param in self.readable_params:
|
for param_id in self.model.input_params:
|
||||||
value = self.get(param)
|
value = getattr(self.parameters, param_id, None)
|
||||||
if isinstance(value, Enum):
|
if isinstance(value, Enum):
|
||||||
value = value.value
|
value = value.value
|
||||||
state[param] = value
|
if value is None:
|
||||||
|
value = 0.0 # fallback for params not initialised in sim state
|
||||||
|
state[param_id] = value
|
||||||
|
|
||||||
# Use the model to predict the next state
|
# Forward pass — same interface for both NN and kNN
|
||||||
with torch.no_grad():
|
if isinstance(self.model, ReactorDynamicsModel):
|
||||||
next_state = self.model(state, time_step)
|
with torch.no_grad():
|
||||||
|
next_state = self.model.forward(state, time_step)
|
||||||
|
else:
|
||||||
|
next_state = self.model.forward(state, time_step)
|
||||||
|
|
||||||
# Update the simulator's state
|
# Update only the output params the model predicts
|
||||||
for param, value in next_state.items():
|
for param_id, value in next_state.items():
|
||||||
self.set(param, value)
|
try:
|
||||||
|
self.set(param_id, value, force=True)
|
||||||
|
except (ValueError, KeyError):
|
||||||
|
pass # ignore params that can't be set (type mismatch, unknown)
|
||||||
|
|
||||||
def set_state(self, state: OperatingState) -> None:
|
def set_state(self, state: OperatingState) -> None:
|
||||||
self._sample_parameters_from_state(state)
|
self._sample_parameters_from_state(state)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user