- ReactorDynamicsNet: add dropout (0.3) for regularisation - ReactorDynamicsModel: z-score normalisation of inputs/outputs, predict per-second rates of change, forward_with_uncertainty() stub - rl.py: misc SAC training improvements - sim.py: minor fixes - train_sac.py: updated training loop Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
567 lines
24 KiB
Python
567 lines
24 KiB
Python
import inspect
|
|
import gymnasium as gym
|
|
from gymnasium import spaces
|
|
import numpy as np
|
|
import time
|
|
from typing import Dict, Any, Callable, List, Optional
|
|
from enum import Enum
|
|
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reward / objective helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _alarm_penalty(obs):
|
|
"""Penalty proportional to number of active alarms. Only meaningful when running against the real game."""
|
|
raw = obs.get('ALARMS_ACTIVE', '')
|
|
if not raw or not raw.strip():
|
|
return 0.0
|
|
return -float(len(raw.split(',')))
|
|
|
|
Objectives = {
|
|
"null": lambda obs: 0,
|
|
"max_power": lambda obs: obs["GENERATOR_0_KW"] + obs["GENERATOR_1_KW"] + obs["GENERATOR_2_KW"],
|
|
"episode_time": lambda obs: obs["EPISODE_TIME"],
|
|
"alarm_penalty": _alarm_penalty,
|
|
}
|
|
|
|
def _uncertainty_penalty(start=0.3, scale=1.0, mode='l2'):
|
|
excess = lambda obs: max(0.0, obs.get('SIM_UNCERTAINTY', 0.0) - start)
|
|
if mode == 'l2':
|
|
return lambda obs: -scale * excess(obs) ** 2
|
|
elif mode == 'linear':
|
|
return lambda obs: -scale * excess(obs)
|
|
else:
|
|
raise ValueError(f"Unknown mode '{mode}'. Use 'l2' or 'linear'.")
|
|
|
|
def _uncertainty_abort(threshold=0.7):
|
|
return lambda obs: 1.0 if obs.get('SIM_UNCERTAINTY', 0.0) >= threshold else 0.0
|
|
|
|
Parameterized_Objectives = {
|
|
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
|
|
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
|
|
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
|
|
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
|
|
"temp_below_linear": lambda max_temp: lambda obs: -np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf),
|
|
"temp_above_linear": lambda min_temp: lambda obs: -np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf),
|
|
"constant": lambda constant: lambda obs: constant,
|
|
"uncertainty_penalty": _uncertainty_penalty, # (start, scale, mode) -> (obs) -> float
|
|
}
|
|
|
|
Parameterized_Terminators = {
|
|
"uncertainty_abort": _uncertainty_abort, # (threshold,) -> (obs) -> float
|
|
}
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_flat_action_space(nucon, obs_param_set=None, delta_action_scale=None):
|
|
"""Return (Box, ordered_param_ids, param_ranges).
|
|
|
|
If delta_action_scale is set, the action space is [-1, 1]^n and actions are
|
|
treated as normalised deltas: actual_delta = action * delta_action_scale * (max - min).
|
|
Otherwise the action space spans [min_val, max_val] per param (absolute values).
|
|
"""
|
|
params = []
|
|
lows, highs, ranges = [], [], []
|
|
for param_id, param in nucon.get_all_writable().items():
|
|
if not param.is_readable or param.is_cheat:
|
|
continue
|
|
if obs_param_set is not None and param_id not in obs_param_set:
|
|
continue
|
|
if param.min_val is None or param.max_val is None:
|
|
continue # SAC requires finite action bounds
|
|
sp = _build_param_space(param)
|
|
if sp is None:
|
|
continue
|
|
params.append(param_id)
|
|
lows.append(sp.low[0])
|
|
highs.append(sp.high[0])
|
|
ranges.append(sp.high[0] - sp.low[0])
|
|
if delta_action_scale is not None:
|
|
n = len(params)
|
|
box = spaces.Box(low=-np.ones(n, dtype=np.float32),
|
|
high=np.ones(n, dtype=np.float32), dtype=np.float32)
|
|
else:
|
|
box = spaces.Box(low=np.array(lows, dtype=np.float32),
|
|
high=np.array(highs, dtype=np.float32), dtype=np.float32)
|
|
return box, params, np.array(lows, dtype=np.float32), np.array(ranges, dtype=np.float32)
|
|
|
|
|
|
def _unflatten_action(flat_action, param_ids):
|
|
return {pid: float(flat_action[i]) for i, pid in enumerate(param_ids)}
|
|
|
|
|
|
def _build_param_space(param):
|
|
"""Return a gymnasium Box for a single NuconParameter, or None if unsupported."""
|
|
if param.param_type in (float, int):
|
|
lo = param.min_val if param.min_val is not None else -np.inf
|
|
hi = param.max_val if param.max_val is not None else np.inf
|
|
return spaces.Box(low=lo, high=hi, shape=(1,), dtype=np.float32)
|
|
elif param.param_type == bool:
|
|
return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
|
|
elif param.param_type == str:
|
|
return None
|
|
elif issubclass(param.param_type, Enum):
|
|
return spaces.Box(low=0, high=len(param.param_type) - 1, shape=(1,), dtype=np.float32)
|
|
return None
|
|
|
|
|
|
def _apply_action(nucon, action):
|
|
for param_id, value in action.items():
|
|
param = nucon._parameters[param_id]
|
|
v = float(np.asarray(value).flat[0])
|
|
if param.param_type == bool:
|
|
value = v >= 0.5 # [0,1] space: above midpoint → True
|
|
elif issubclass(param.param_type, Enum):
|
|
value = param.param_type(int(v))
|
|
else:
|
|
value = param.param_type(v)
|
|
if param.min_val is not None and param.max_val is not None:
|
|
value = param.param_type(np.clip(value, param.min_val, param.max_val))
|
|
nucon.set(param, value)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NuconEnv
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class NuconEnv(gym.Env):
|
|
metadata = {'render_modes': ['human']}
|
|
|
|
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5,
|
|
objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
|
|
super().__init__()
|
|
|
|
self.render_mode = render_mode
|
|
self.seconds_per_step = seconds_per_step
|
|
if objective_weights is None:
|
|
objective_weights = [1.0 for _ in objectives]
|
|
self.objective_weights = objective_weights
|
|
self.terminate_above = terminate_above
|
|
self.simulator = simulator
|
|
|
|
if nucon is None:
|
|
nucon = Nucon(port=simulator.port) if simulator else Nucon()
|
|
self.nucon = nucon
|
|
|
|
# Observation space — SIM_UNCERTAINTY included when a simulator is present
|
|
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
|
|
if simulator is not None:
|
|
obs_spaces['SIM_UNCERTAINTY'] = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
|
|
for param_id, param in self.nucon.get_all_readable().items():
|
|
sp = _build_param_space(param)
|
|
if sp is not None:
|
|
obs_spaces[param_id] = sp
|
|
self.observation_space = spaces.Dict(obs_spaces)
|
|
|
|
self.action_space, self._action_params, self._action_lows, self._action_ranges = \
|
|
_build_flat_action_space(self.nucon)
|
|
|
|
self.objectives = []
|
|
self.terminators = []
|
|
for objective in objectives:
|
|
if objective in Objectives:
|
|
self.objectives.append(Objectives[objective])
|
|
elif callable(objective):
|
|
self.objectives.append(objective)
|
|
else:
|
|
raise ValueError(f"Unsupported objective: {objective}")
|
|
for terminator in terminators:
|
|
if terminator in Objectives:
|
|
self.terminators.append(Objectives[terminator])
|
|
elif callable(terminator):
|
|
self.terminators.append(terminator)
|
|
else:
|
|
raise ValueError(f"Unsupported terminator: {terminator}")
|
|
|
|
def _get_obs(self, sim_uncertainty=None):
|
|
obs = {}
|
|
for param_id, param in self.nucon.get_all_readable().items():
|
|
if param.param_type == str or param_id not in self.observation_space.spaces:
|
|
continue
|
|
value = self.nucon.get(param_id)
|
|
if isinstance(value, Enum):
|
|
value = value.value
|
|
obs[param_id] = value
|
|
obs['EPISODE_TIME'] = self._total_steps * self.seconds_per_step
|
|
if 'SIM_UNCERTAINTY' in self.observation_space.spaces:
|
|
obs['SIM_UNCERTAINTY'] = sim_uncertainty if sim_uncertainty is not None else 0.0
|
|
return obs
|
|
|
|
def _get_info(self, obs):
|
|
info = {'objectives': {}, 'objectives_weighted': {}}
|
|
for objective, weight in zip(self.objectives, self.objective_weights):
|
|
obj = objective(obs)
|
|
name = getattr(objective, '__name__', repr(objective))
|
|
info['objectives'][name] = obj
|
|
info['objectives_weighted'][name] = obj * weight
|
|
return info
|
|
|
|
def reset(self, seed=None, options=None):
|
|
super().reset(seed=seed)
|
|
self._total_steps = 0
|
|
observation = self._get_obs()
|
|
return observation, self._get_info(observation)
|
|
|
|
def step(self, action):
|
|
_apply_action(self.nucon, _unflatten_action(action, self._action_params))
|
|
|
|
# Advance sim (or sleep) — get uncertainty for obs injection
|
|
truncated = False
|
|
uncertainty = None
|
|
if self.simulator:
|
|
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
|
|
else:
|
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
|
time.sleep(self.seconds_per_step / sim_speed)
|
|
|
|
self._total_steps += 1
|
|
observation = self._get_obs(sim_uncertainty=uncertainty)
|
|
info = self._get_info(observation)
|
|
reward = sum(obj for obj in info['objectives_weighted'].values())
|
|
terminated = np.sum([t(observation) for t in self.terminators]) > self.terminate_above
|
|
return observation, reward, terminated, truncated, info
|
|
|
|
def render(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def _flatten_observation(self, observation):
|
|
return np.concatenate([np.asarray(v).flatten() for v in observation.values()])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NuconGoalEnv
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class NuconGoalEnv(gym.Env):
|
|
"""
|
|
Goal-conditioned reactor environment compatible with SB3 HER (Hindsight Experience Replay).
|
|
|
|
Observation is a Dict with three keys:
|
|
- 'observation': all readable non-goal, non-str params + SIM_UNCERTAINTY (when sim active)
|
|
- 'achieved_goal': current values of goal_params, normalised to [0, 1] within goal_range
|
|
- 'desired_goal': target values sampled each episode, normalised to [0, 1]
|
|
|
|
``SIM_UNCERTAINTY`` in 'observation' lets reward_fn / terminators reference uncertainty directly.
|
|
|
|
reward_fn signature: ``(achieved, desired)`` or ``(achieved, desired, obs)`` — the 3-arg form
|
|
receives the full observation dict (including SIM_UNCERTAINTY) for uncertainty-aware shaping.
|
|
|
|
Usage with SB3 HER::
|
|
|
|
from stable_baselines3 import SAC
|
|
from stable_baselines3.common.buffers import HerReplayBuffer
|
|
from nucon.rl import NuconGoalEnv, UncertaintyPenalty, UncertaintyAbort
|
|
|
|
env = NuconGoalEnv(
|
|
goal_params=['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
|
goal_range={'GENERATOR_0_KW': (0, 1200), 'GENERATOR_1_KW': (0, 1200), 'GENERATOR_2_KW': (0, 1200)},
|
|
tolerance=0.05,
|
|
simulator=simulator,
|
|
# uncertainty-aware reward: penalise OOD, abort if too far out
|
|
reward_fn=lambda ag, dg, obs: (
|
|
-(np.linalg.norm(ag - dg) ** 2)
|
|
- 2.0 * max(0, obs.get('SIM_UNCERTAINTY', 0) - 0.3) ** 2
|
|
),
|
|
terminators=[UncertaintyAbort(threshold=0.7)],
|
|
)
|
|
model = SAC('MultiInputPolicy', env, replay_buffer_class=HerReplayBuffer)
|
|
model.learn(total_timesteps=500_000)
|
|
"""
|
|
|
|
metadata = {'render_modes': ['human']}
|
|
|
|
def __init__(
|
|
self,
|
|
goal_params,
|
|
goal_range=None,
|
|
reward_fn=None,
|
|
tolerance=None,
|
|
nucon=None,
|
|
simulator=None,
|
|
render_mode=None,
|
|
seconds_per_step=5,
|
|
terminators=None,
|
|
terminate_above=0,
|
|
additional_objectives=None,
|
|
additional_objective_weights=None,
|
|
obs_params=None,
|
|
action_params=None,
|
|
init_states=None,
|
|
delta_action_scale=None,
|
|
goal_sampling_std=None,
|
|
):
|
|
super().__init__()
|
|
|
|
self.render_mode = render_mode
|
|
self.seconds_per_step = seconds_per_step
|
|
self._delta_action_scale = delta_action_scale
|
|
self.terminate_above = terminate_above
|
|
self.simulator = simulator
|
|
self.goal_params = list(goal_params)
|
|
self.tolerance = tolerance
|
|
|
|
if nucon is None:
|
|
nucon = Nucon(port=simulator.port) if simulator else Nucon()
|
|
self.nucon = nucon
|
|
|
|
all_readable = self.nucon.get_all_readable()
|
|
for pid in self.goal_params:
|
|
if pid not in all_readable:
|
|
raise ValueError(f"Goal param '{pid}' is not a readable parameter")
|
|
|
|
goal_range = goal_range or {}
|
|
self._goal_low = np.array([
|
|
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[0]
|
|
for pid in self.goal_params
|
|
], dtype=np.float32)
|
|
self._goal_high = np.array([
|
|
goal_range.get(pid, (all_readable[pid].min_val or 0.0, all_readable[pid].max_val or 1.0))[1]
|
|
for pid in self.goal_params
|
|
], dtype=np.float32)
|
|
self._goal_range = self._goal_high - self._goal_low
|
|
self._goal_range[self._goal_range == 0] = 1.0
|
|
|
|
# Detect reward_fn arity for backward compat (2-arg vs 3-arg)
|
|
self._reward_fn = reward_fn
|
|
if reward_fn is not None:
|
|
n_args = len(inspect.signature(reward_fn).parameters)
|
|
self._reward_fn_wants_obs = n_args >= 3
|
|
else:
|
|
self._reward_fn_wants_obs = False
|
|
|
|
# Observation params: model.input_params defines the canonical list — the same set is
|
|
# used whether training in sim or deploying to the real game (the game simply has more
|
|
# params available; we query only the subset we care about).
|
|
# Explicit obs_params overrides everything (use when deploying to real game without sim).
|
|
# SB3 HER requires observation to be a flat Box, not a nested Dict.
|
|
goal_set = set(self.goal_params)
|
|
self._obs_with_uncertainty = simulator is not None
|
|
if obs_params is not None:
|
|
base_params = [p for p in obs_params if p not in goal_set]
|
|
elif simulator is not None and hasattr(simulator, 'model') and simulator.model is not None:
|
|
base_params = [p for p in simulator.model.input_params
|
|
if p not in goal_set and p in all_readable
|
|
and _build_param_space(all_readable[p]) is not None]
|
|
else:
|
|
base_params = [p for p, param in all_readable.items()
|
|
if p not in goal_set and _build_param_space(param) is not None]
|
|
# SIM_UNCERTAINTY is not in _obs_params — it's not available at deployment on the real game
|
|
self._obs_params = base_params
|
|
|
|
n_goals = len(self.goal_params)
|
|
self.observation_space = spaces.Dict({
|
|
'observation': spaces.Box(low=-np.inf, high=np.inf,
|
|
shape=(len(self._obs_params),), dtype=np.float32),
|
|
'achieved_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
|
|
'desired_goal': spaces.Box(low=0.0, high=1.0, shape=(n_goals,), dtype=np.float32),
|
|
})
|
|
|
|
# Action space: writable params within the obs param set, or an explicit override list.
|
|
action_set = set(action_params) if action_params is not None else set(base_params)
|
|
self.action_space, self._action_params, self._action_lows, self._action_ranges = \
|
|
_build_flat_action_space(self.nucon, action_set, delta_action_scale)
|
|
|
|
self._terminators = terminators or []
|
|
_objs = additional_objectives or []
|
|
self._objectives = [Objectives[o] if isinstance(o, str) else o for o in _objs]
|
|
self._objective_weights = additional_objective_weights or [1.0] * len(self._objectives)
|
|
self._init_states = init_states # list of state dicts to sample on reset
|
|
self._goal_sampling_std = goal_sampling_std # Gaussian std in normalised goal space; None → uniform
|
|
self._desired_goal = np.zeros(n_goals, dtype=np.float32)
|
|
self._total_steps = 0
|
|
|
|
def compute_reward(self, achieved_goal, desired_goal, info):
|
|
"""Dense negative L2, sparse with tolerance, or custom reward_fn."""
|
|
obs_named = info.get('obs_named', {}) if isinstance(info, dict) else {}
|
|
if self._reward_fn is not None:
|
|
if self._reward_fn_wants_obs:
|
|
return self._reward_fn(achieved_goal, desired_goal, obs_named)
|
|
return self._reward_fn(achieved_goal, desired_goal)
|
|
dist = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
|
|
if self.tolerance is not None:
|
|
return (dist <= self.tolerance).astype(np.float32) - 1.0
|
|
return -dist
|
|
|
|
def _read_goal_values(self):
|
|
raw = np.array([self.nucon.get(pid) or 0.0 for pid in self.goal_params], dtype=np.float32)
|
|
return np.clip((raw - self._goal_low) / self._goal_range, 0.0, 1.0)
|
|
|
|
def _read_obs(self, sim_uncertainty=None):
|
|
"""Return (gym_obs_dict, reward_obs_dict).
|
|
|
|
When a simulator is attached, reads directly from sim.parameters (no HTTP).
|
|
Otherwise falls back to a single batch HTTP request.
|
|
"""
|
|
def _to_float(v):
|
|
if v is None:
|
|
return 0.0
|
|
return float(v.value if isinstance(v, Enum) else v)
|
|
|
|
if self.simulator is not None:
|
|
# Direct in-process read — no HTTP overhead
|
|
def _get(pid):
|
|
return _to_float(self.simulator.get(pid))
|
|
else:
|
|
raw = self.nucon._batch_query(self._obs_params + self.goal_params)
|
|
all_params = self.nucon.get_all_readable()
|
|
def _get(pid):
|
|
try:
|
|
v = self.nucon._parse_value(all_params[pid], raw.get(pid, '0'))
|
|
return _to_float(v)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
reward_obs = {}
|
|
if self._obs_with_uncertainty:
|
|
reward_obs['SIM_UNCERTAINTY'] = float(sim_uncertainty) if sim_uncertainty is not None else 0.0
|
|
for pid in self._obs_params:
|
|
reward_obs[pid] = _get(pid)
|
|
|
|
obs_vec = np.array([reward_obs[p] for p in self._obs_params], dtype=np.float32)
|
|
goal_raw = np.array([_get(p) for p in self.goal_params], dtype=np.float32)
|
|
achieved = np.clip((goal_raw - self._goal_low) / self._goal_range, 0.0, 1.0)
|
|
gym_obs = {'observation': obs_vec, 'achieved_goal': achieved,
|
|
'desired_goal': self._desired_goal.copy()}
|
|
return gym_obs, reward_obs
|
|
|
|
def reset(self, seed=None, options=None):
|
|
super().reset(seed=seed)
|
|
self._total_steps = 0
|
|
rng = np.random.default_rng(seed)
|
|
if self._init_states is not None and self.simulator is not None:
|
|
state = self._init_states[rng.integers(len(self._init_states))]
|
|
for k, v in state.items():
|
|
try:
|
|
self.simulator.set(k, v, force=True)
|
|
except Exception:
|
|
pass
|
|
if self._goal_sampling_std is not None:
|
|
# Sample goal as Gaussian delta from current state — usually a small change,
|
|
# occasionally a large one.
|
|
current = np.array([
|
|
float(self.simulator.get(p) if self.simulator else 0.0)
|
|
for p in self.goal_params
|
|
], dtype=np.float32)
|
|
current_norm = np.clip((current - self._goal_low) / self._goal_range, 0.0, 1.0)
|
|
delta = rng.normal(0.0, self._goal_sampling_std, size=len(self.goal_params))
|
|
self._desired_goal = np.clip(current_norm + delta, 0.0, 1.0).astype(np.float32)
|
|
else:
|
|
self._desired_goal = rng.uniform(0.0, 1.0, size=len(self.goal_params)).astype(np.float32)
|
|
gym_obs, _ = self._read_obs()
|
|
return gym_obs, {}
|
|
|
|
def step(self, action):
|
|
flat = np.asarray(action, dtype=np.float32)
|
|
if self._delta_action_scale is not None:
|
|
# Compute absolute values from deltas, reading current state
|
|
if self.simulator is None:
|
|
raw_current = self.nucon._batch_query(self._action_params)
|
|
all_params = self.nucon.get_all_readable()
|
|
absolute = {}
|
|
for i, pid in enumerate(self._action_params):
|
|
param = self.nucon._parameters[pid]
|
|
if param.param_type == bool:
|
|
absolute[pid] = 1.0 if flat[i] > 0 else 0.0
|
|
else:
|
|
if self.simulator is not None:
|
|
v = self.simulator.get(pid)
|
|
current = float(v.value if isinstance(v, Enum) else v) if v is not None else 0.0
|
|
else:
|
|
try:
|
|
v = self.nucon._parse_value(all_params[pid], raw_current.get(pid, '0'))
|
|
current = float(v.value if isinstance(v, Enum) else v)
|
|
except Exception:
|
|
current = 0.0
|
|
delta = float(flat[i]) * self._delta_action_scale * self._action_ranges[i]
|
|
absolute[pid] = float(np.clip(current + delta,
|
|
self._action_lows[i],
|
|
self._action_lows[i] + self._action_ranges[i]))
|
|
else:
|
|
absolute = _unflatten_action(flat, self._action_params)
|
|
|
|
if self.simulator is not None:
|
|
# Write directly to sim — skip HTTP entirely
|
|
for pid, val in absolute.items():
|
|
try:
|
|
self.simulator.set(pid, val, force=True)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
_apply_action(self.nucon, absolute)
|
|
|
|
if self.simulator:
|
|
uncertainty = self.simulator.update(self.seconds_per_step, return_uncertainty=True)
|
|
else:
|
|
sim_speed = self.nucon.GAME_SIM_SPEED.value or 1.0
|
|
time.sleep(self.seconds_per_step / sim_speed)
|
|
uncertainty = None
|
|
|
|
self._total_steps += 1
|
|
gym_obs, reward_obs = self._read_obs(sim_uncertainty=uncertainty)
|
|
info = {'achieved_goal': gym_obs['achieved_goal'], 'desired_goal': gym_obs['desired_goal'],
|
|
'obs_named': reward_obs}
|
|
reward = float(self.compute_reward(gym_obs['achieved_goal'], gym_obs['desired_goal'], info))
|
|
reward += sum(w * o(reward_obs) for o, w in zip(self._objectives, self._objective_weights))
|
|
terminated = any(t(reward_obs) > self.terminate_above for t in self._terminators)
|
|
return gym_obs, reward, terminated, False, info
|
|
|
|
def render(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def register_nucon_envs():
|
|
gym.register(
|
|
id='Nucon-max_power-v0',
|
|
entry_point='nucon.rl:NuconEnv',
|
|
kwargs={'seconds_per_step': 5, 'objectives': ['max_power']}
|
|
)
|
|
gym.register(
|
|
id='Nucon-target_temperature_350-v0',
|
|
entry_point='nucon.rl:NuconEnv',
|
|
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]}
|
|
)
|
|
gym.register(
|
|
id='Nucon-safe_max_power-v0',
|
|
entry_point='nucon.rl:NuconEnv',
|
|
kwargs={'seconds_per_step': 5,
|
|
'objectives': [Parameterized_Objectives['temp_above'](min_temp=310),
|
|
Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'],
|
|
'objective_weights': [1, 10, 1/100_000]}
|
|
)
|
|
gym.register(
|
|
id='Nucon-goal_power-v0',
|
|
entry_point='nucon.rl:NuconGoalEnv',
|
|
kwargs={
|
|
'goal_params': ['GENERATOR_0_KW', 'GENERATOR_1_KW', 'GENERATOR_2_KW'],
|
|
'goal_range': {'GENERATOR_0_KW': (0.0, 1200.0), 'GENERATOR_1_KW': (0.0, 1200.0), 'GENERATOR_2_KW': (0.0, 1200.0)},
|
|
'seconds_per_step': 5,
|
|
}
|
|
)
|
|
gym.register(
|
|
id='Nucon-goal_temp-v0',
|
|
entry_point='nucon.rl:NuconGoalEnv',
|
|
kwargs={
|
|
'goal_params': ['CORE_TEMP'],
|
|
'goal_range': {'CORE_TEMP': (280.0, 380.0)},
|
|
'seconds_per_step': 5,
|
|
}
|
|
)
|
|
|
|
register_nucon_envs()
|