NuCon/nucon/rl.py
Dominik Roth 0932bb353a feat: SAC+HER training on kNN-GP sim with direct bypass and scripts/
- nucon/rl.py: delta_action_scale action space, bool handling (>=0.5),
  direct sim read/write bypassing HTTP for ~2000fps env throughput;
  remove uncertainty_abort from training (use penalty-only), larger
  default batch sizes; fix _read_obs and step for in-process sim
- nucon/model.py: optimise _lookup with einsum squared-L2, vectorised
  rbf kernel; forward_with_uncertainty uses pre-built normalised arrays
- nucon/sim.py: _update_reactor_state writes outputs via setattr directly
- scripts/train_sac.py: moved from root; full SAC+HER example with kNN-GP
  sim, delta actions, uncertainty penalty, init_states
- scripts/collect_dataset.py: CLI tool to collect dynamics dataset from
  live game session (--steps, --delta, --out, --merge)
- README.md: add Scripts section, reference both scripts in training loop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 20:43:37 +01:00

535 lines
23 KiB
Python

import inspect
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import time
from typing import Dict, Any, Callable, List, Optional
from enum import Enum
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
# ---------------------------------------------------------------------------
# Reward / objective helpers
# ---------------------------------------------------------------------------
Objectives = {
"null": lambda obs: 0,
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
"episode_time": lambda obs: obs["EPISODE_TIME"],
}
def _uncertainty_penalty(start=0.3, scale=1.0, mode='l2'):
excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start)
if mode == 'l2':
return lambda obs: -scale * excess(obs) ** 2
elif mode == 'linear':
return lambda obs: -scale * excess(obs)
else:
raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.")
def _uncertainty_abort(threshold=0.7):
return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0
Parameterized_Objectives = {
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
"uncertainty_penalty": _uncertainty_penalty, # (start, scale, mode) -> (obs) -> float
}
Parameterized_Terminators = {
"uncertainty_abort": _uncertainty_abort, # (threshold,) -> (obs) -> float
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _build_flat_action_space(nucon, obs_param_set=None, delta_action_scale=None):
"""Return (Box, ordered_param_ids, param_ranges).
If delta_action_scale is set, the action space is [-1, 1]^n and actions are
treated as normalised deltas: actual_delta = action * delta_action_scale * (max - min).
Otherwise the action space spans [min_val, max_val] per param (absolute values).
"""
params = []
lows, highs, ranges = [], [], []
for param_id, param in nucon.get_all_writable().items():
if not param.is_readable or param.is_cheat:
continue
if obs_param_set is not None and param_id not in obs_param_set:
continue
if param.min_val is None or param.max_val is None:
continue # SAC requires finite action bounds
sp = _build_param_space(param)
if sp is None:
continue
params.append(param_id)
lows.append(sp.low[0])
highs.append(sp.high[0])
ranges.append(sp.high[0] - sp.low[0])
if delta_action_scale is not None:
n = len(params)
box = spaces.Box(low=-np.ones(n, dtype=np.float32),
high=np.ones(n, dtype=np.float32), dtype=np.float32)
else:
box = spaces.Box(low=np.array(lows, dtype=np.float32),
high=np.array(highs, dtype=np.float32), dtype=np.float32)
return box, params, np.array(lows, dtype=np.float32), np.array(ranges, dtype=np.float32)
def _unflatten_action(flat_action, param_ids):
return {pid: float(flat_action[i]) for i, pid in enumerate(param_ids)}
def _build_param_space(param):
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
if param.param_type in (float, int):
lo = param.min_val if param.min_val is not None else -np.inf
hi = param.max_val if param.max_val is not None else np.inf
return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32)
elif param.param_type == bool:
return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif param.param_type == str:
return None
elif issubclass(param.param_type, Enum):
return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32)
return None
def _apply_action(nucon, action):
for param_id, value in action.items():
param = nucon._parameters[param_id]
v = float(np.asarray(value).flat[0])
if param.param_type == bool:
value = v >= 0.5 # [0,1] space: above midpoint → True
elif issubclass(param.param_type, Enum):
value = param.param_type(int(v))
else:
value = param.param_type(v)
if param.min_val is not None and param.max_val is not None:
value = param.param_type(np.clip(value, param.min_val, param.max_val))
nucon.set(param, value)
# ---------------------------------------------------------------------------
# NuconEnv
# ---------------------------------------------------------------------------
class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5,
objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__()
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
if objective_weights is None:
objective_weights = [1.0 for _ in objectives]
self.objective_weights = objective_weights
self.terminate_above = terminate_above
self.simulator = simulator
if nucon is None:
nucon = Nucon(port=simulator.port) if simulator else Nucon()
self.nucon = nucon
# Observation space — SIM_UNCERTAINTY included when a simulator is present
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
if simulator is not None:
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
for param_id, param in self.nucon.get_all_readable().items():
sp = _build_param_space(param)
if sp is not None:
obs_spaces[param_id] = sp
self.observation_space = spaces.Dict(obs_spaces)
self.action_space, self._action_params, self._action_lows, self._action_ranges = \
_build_flat_action_space(self.nucon)
self.objectives = []
self.terminators = []
for objective in objectives:
if objective in Objectives:
self.objectives.append(Objectives[objective])
elif callable(objective):
self.objectives.append(objective)
else:
raise ValueError(f"Unsupported objective: {objective}")
for terminator in terminators:
if terminator in Objectives:
self.terminators.append(Objectives[terminator])
elif callable(terminator):
self.terminators.append(terminator)
else:
raise ValueError(f"Unsupported terminator: {terminator}")
def _get_obs(self, sim_uncertainty=None):
obs = {}
for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == str or param_id not in self.observation_space.spaces:
continue
value = self.nucon.get(param_id)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step
if 'SIM_UNCERTAINTY' in self.observation_space.spaces:
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
return obs
def _get_info(self, obs):
info = {'objectives': {}, 'objectives_weighted': {}}
for objective, weight in zip(self.objectives, self.objective_weights):
obj = objective(obs)
name = getattr(objective, '__name__', repr(objective))
info['objectives'][name] = obj
info['objectives_weighted'][name] = obj * weight
return info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
observation = self._get_obs()
return observation, self._get_info(observation)
def step(self, action):
_apply_action(self.nucon, _unflatten_action(action, self._action_params))
# Advance sim (or sleep) — get uncertainty for obs injection
truncated = False
uncertainty = None
if self.simulator:
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
self._total_steps += 1
observation = self._get_obs(sim_uncertainty=uncertainty)
info = self._get_info(observation)
reward = sum(obj for obj in info['objectives_weighted'].values())
terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above
return observation, reward, terminated, truncated, info
def render(self):
pass
def close(self):
pass
def _flatten_observation(self, observation):
return np.concatenate([np.asarray(v).flatten() for v in observation.values()])
# ---------------------------------------------------------------------------
# NuconGoalEnv
# ---------------------------------------------------------------------------
class NuconGoalEnv(gym.Env):
"""
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
Observation is a Dict with three keys:
- 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active)
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly.
reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form
receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping.
Usage with SB3 HER::
from stable_baselines3 import SAC
from stable_baselines3.common.buffers import HerReplayBuffer
from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort
env = NuconGoalEnv(
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
tolerance=0.05,
simulator=simulator,
# uncertainty-aware reward: penalise OOD, abort if too far out
reward_fn=lambda ag, dg, obs: (
-(np.linalg.norm(ag - dg) ** 2)
- 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2
),
terminators=[UncertaintyAbort(threshold=0.7)],
)
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
model.learn(total_timesteps=500_000)
"""
metadata = {'render_modes': ['human']}
def __init__(
self,
goal_params,
goal_range=None,
reward_fn=None,
tolerance=None,
nucon=None,
simulator=None,
render_mode=None,
seconds_per_step=5,
terminators=None,
terminate_above=0,
additional_objectives=None,
additional_objective_weights=None,
obs_params=None,
init_states=None,
delta_action_scale=None,
):
super().__init__()
self.render_mode = render_mode
self.seconds_per_step = seconds_per_step
self._delta_action_scale = delta_action_scale
self.terminate_above = terminate_above
self.simulator = simulator
self.goal_params = list(goal_params)
self.tolerance = tolerance
if nucon is None:
nucon = Nucon(port=simulator.port) if simulator else Nucon()
self.nucon = nucon
all_readable = self.nucon.get_all_readable()
for pid in self.goal_params:
if pid not in all_readable:
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
goal_range = goal_range or {}
self._goal_low = np.array([
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
for pid in self.goal_params
], dtype=np.float32)
self._goal_high = np.array([
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1]
for pid in self.goal_params
], dtype=np.float32)
self._goal_range = self._goal_high - self._goal_low
self._goal_range[self._goal_range == 0] = 1.0
# Detect reward_fn arity for backward compat (2-arg vs 3-arg)
self._reward_fn = reward_fn
if reward_fn is not None:
n_args = len(inspect.signature(reward_fn).parameters)
self._reward_fn_wants_obs = n_args >= 3
else:
self._reward_fn_wants_obs = False
# Observation params: model.input_params defines the canonical list — the same set is
# used whether training in sim or deploying to the real game (the game simply has more
# params available; we query only the subset we care about).
# Explicit obs_params overrides everything (use when deploying to real game without sim).
# SB3 HER requires observation to be a flat Box, not a nested Dict.
goal_set = set(self.goal_params)
self._obs_with_uncertainty = simulator is not None
if obs_params is not None:
base_params = [p for p in obs_params if p not in goal_set]
elif simulator is not None and hasattr(simulator, 'model') and simulator.model is not None:
base_params = [p for p in simulator.model.input_params
if p not in goal_set and p in all_readable
and _build_param_space(all_readable[p]) is not None]
else:
base_params = [p for p, param in all_readable.items()
if p not in goal_set and _build_param_space(param) is not None]
# SIM_UNCERTAINTY is not in _obs_params — it's not available at deployment on the real game
self._obs_params = base_params
n_goals = len(self.goal_params)
self.observation_space = spaces.Dict({
'observation': spaces.Box(low=-np.inf, high=np.inf,
shape=(len(self._obs_params),), dtype=np.float32),
'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
})
# Action space: writable params within the obs param set (flat Box for SB3 compatibility).
self.action_space, self._action_params, self._action_lows, self._action_ranges = \
_build_flat_action_space(self.nucon, set(base_params), delta_action_scale)
self._terminators = terminators or []
_objs = additional_objectives or []
self._objectives = [Objectives[o] if isinstance(o, str) else o for o in _objs]
self._objective_weights = additional_objective_weights or [1.0] * len(self._objectives)
self._init_states = init_states # list of state dicts to sample on reset
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
self._total_steps = 0
def compute_reward(self, achieved_goal, desired_goal, info):
"""Dense negative L2, sparse with tolerance, or custom reward_fn."""
obs_named = info.get('obs_named', {}) if isinstance(info, dict) else {}
if self._reward_fn is not None:
if self._reward_fn_wants_obs:
return self._reward_fn(achieved_goal, desired_goal, obs_named)
return self._reward_fn(achieved_goal, desired_goal)
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
if self.tolerance is not None:
return (dist <= self.tolerance).astype(np.float32) - 1.0
return -dist
def _read_goal_values(self):
raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32)
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
def _read_obs(self, sim_uncertainty=None):
"""Return (gym_obs_dict, reward_obs_dict).
When a simulator is attached, reads directly from sim.parameters (no HTTP).
Otherwise falls back to a single batch HTTP request.
"""
def _to_float(v):
if v is None:
return 0.0
return float(v.value if isinstance(v, Enum) else v)
if self.simulator is not None:
# Direct in-process read — no HTTP overhead
def _get(pid):
return _to_float(self.simulator.get(pid))
else:
raw = self.nucon._batch_query(self._obs_params + self.goal_params)
all_params = self.nucon.get_all_readable()
def _get(pid):
try:
v = self.nucon._parse_value(all_params[pid], raw.get(pid, '0'))
return _to_float(v)
except Exception:
return 0.0
reward_obs = {}
if self._obs_with_uncertainty:
reward_obs['SIM_UNCERTAINTY'] = float(sim_uncertainty) if sim_uncertainty is not None else 0.0
for pid in self._obs_params:
reward_obs[pid] = _get(pid)
obs_vec = np.array([reward_obs[p] for p in self._obs_params], dtype=np.float32)
goal_raw = np.array([_get(p) for p in self.goal_params], dtype=np.float32)
achieved = np.clip((goal_raw - self._goal_low) / self._goal_range, 0.0, 1.0)
gym_obs = {'observation': obs_vec, 'achieved_goal': achieved,
'desired_goal': self._desired_goal.copy()}
return gym_obs, reward_obs
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._total_steps = 0
rng = np.random.default_rng(seed)
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
if self._init_states is not None and self.simulator is not None:
state = self._init_states[rng.integers(len(self._init_states))]
for k, v in state.items():
try:
self.simulator.set(k, v, force=True)
except Exception:
pass
gym_obs, _ = self._read_obs()
return gym_obs, {}
def step(self, action):
flat = np.asarray(action, dtype=np.float32)
if self._delta_action_scale is not None:
# Compute absolute values from deltas, reading current state directly if possible
absolute = {}
for i, pid in enumerate(self._action_params):
param = self.nucon._parameters[pid]
if param.param_type == bool:
absolute[pid] = 1.0 if flat[i] > 0 else 0.0
else:
if self.simulator is not None:
v = self.simulator.get(pid)
current = float(v.value if isinstance(v, Enum) else v) if v is not None else 0.0
else:
current = 0.0 # fallback; batch read not worth it for actions alone
delta = float(flat[i]) * self._delta_action_scale * self._action_ranges[i]
absolute[pid] = float(np.clip(current + delta,
self._action_lows[i],
self._action_lows[i] + self._action_ranges[i]))
else:
absolute = _unflatten_action(flat, self._action_params)
if self.simulator is not None:
# Write directly to sim — skip HTTP entirely
for pid, val in absolute.items():
try:
self.simulator.set(pid, val, force=True)
except Exception:
pass
else:
_apply_action(self.nucon, absolute)
if self.simulator:
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
else:
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
time.sleep(self.seconds_per_step / sim_speed)
uncertainty = None
self._total_steps += 1
gym_obs, reward_obs = self._read_obs(sim_uncertainty=uncertainty)
info = {'achieved_goal': gym_obs['achieved_goal'], 'desired_goal': gym_obs['desired_goal'],
'obs_named': reward_obs}
reward = float(self.compute_reward(gym_obs['achieved_goal'], gym_obs['desired_goal'], info))
reward += sum(w * o(reward_obs) for o, w in zip(self._objectives, self._objective_weights))
terminated = any(t(reward_obs) > self.terminate_above for t in self._terminators)
return gym_obs, reward, terminated, False, info
def render(self):
pass
def close(self):
pass
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def register_nucon_envs():
gym.register(
id='Nucon-max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': ['max_power']}
)
gym.register(
id='Nucon-target_temperature_350-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]}
)
gym.register(
id='Nucon-safe_max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5,
'objectives': [Parameterized_Objectives['temp_above'](min_temp=310),
Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'],
'objective_weights': [1, 10, 1/100_000]}
)
gym.register(
id='Nucon-goal_power-v0',
entry_point='nucon.rl:NuconGoalEnv',
kwargs={
'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)},
'seconds_per_step': 5,
}
)
gym.register(
id='Nucon-goal_temp-v0',
entry_point='nucon.rl:NuconGoalEnv',
kwargs={
'goal_params': ['CORE_TEMP'],
'goal_range': {'CORE_TEMP': (280.0, 380.0)},
'seconds_per_step': 5,
}
)
register_nucon_envs()