"""Classical PID-based reactor controller with curses TUI. Architecture: Core control (shared): - Rod PID: keeps CORE_TEMP at setpoint via ROD_BANK_POS_0_ORDERED Per-train control (trains 1/2/3, 0-indexed as 0/1/2 in param names): - Primary pump: fixed 50%; hill-climbs only when deficit > 5 MW - MSCV PI: drives train power output, gated on steam availability - Secondary pump feedforward: half of steam outlet + level PID - Bypass: hold near 0 Auxiliary: - Retention tank: drain via ejector return valve when > 75%, stop at 50% - Condenser fill: run FREIGHT_PUMP_CONDENSER below 33%, stop at 50% Usage: python3.14 scripts/reactor_control.py --trains 3 --target 50000 python3.14 scripts/reactor_control.py --trains 1 3 --target 30000 40000 python3.14 scripts/reactor_control.py --trains 1 2 3 --target 20000 20000 20000 TUI keys: 0 Select core (then +/- adjusts temp setpoint ±5°C) 1 / 2 / 3 Select train (then +/- adjusts target power ±5 MW; + adds if absent) d Remove selected train from control g Toggle grid-demand following q / Esc Quit """ import argparse import curses import time import numpy as np from enum import Enum from nucon import Nucon parser = argparse.ArgumentParser() parser.add_argument('--trains', type=int, nargs='+', default=[3]) parser.add_argument('--target', type=float, nargs='+', default=[50_000]) parser.add_argument('--temp-setpoint', type=float, default=330.0) parser.add_argument('--dt', type=float, default=5.0) parser.add_argument('--grid-follow', action='store_true', help='Auto-set train targets from grid demand') parser.add_argument('--grid-buffer', type=float, default=10.0, help='Extra MW above grid demand when grid-following (default: 5)') args = parser.parse_args() if len(args.target) == 1: targets = {t: args.target[0] for t in args.trains} else: if len(args.target) != len(args.trains): raise ValueError("--target must have 1 value or one per --trains entry") targets = dict(zip(args.trains, args.target)) nucon = Nucon() # --------------------------------------------------------------------------- # PID controller # --------------------------------------------------------------------------- class PID: def __init__(self, kp, ki, kd, out_min, out_max, integral_max=None): self.kp, self.ki, self.kd = kp, ki, kd self.out_min, self.out_max = out_min, out_max self.integral_max = integral_max or (out_max - out_min) self._integral = 0.0 self._prev_error = None def step(self, error, dt): self._integral = np.clip(self._integral + error * dt, -self.integral_max, self.integral_max) derivative = 0.0 if self._prev_error is None else (error - self._prev_error) / dt self._prev_error = error return float(np.clip( self.kp * error + self.ki * self._integral + self.kd * derivative, self.out_min, self.out_max)) def reset(self): self._integral = 0.0 self._prev_error = None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _all_readable = None def _get_all_readable(): global _all_readable if _all_readable is None: _all_readable = nucon.get_all_readable() return _all_readable def set_param(param_id, value): param = nucon._parameters[param_id] v = float(np.clip(value, param.min_val or 0, param.max_val or 100)) nucon.set(param, v) return v def read_state(param_ids): all_r = _get_all_readable() raw = nucon._batch_query([p for p in param_ids if p in all_r]) state = {} for p in param_ids: if p not in all_r: state[p] = 0.0 continue try: v = nucon._parse_value(all_r[p], raw.get(p, '0')) state[p] = float(v.value if isinstance(v, Enum) else v) except Exception: state[p] = 0.0 return state # --------------------------------------------------------------------------- # Per-train controller # --------------------------------------------------------------------------- class TrainController: """Controls one train (steam gen N + turbine N + generator N).""" def __init__(self, train_num, target_kw): self.n = train_num self.i = train_num - 1 self.target_kw = target_kw self.mscv_pid = PID(kp=0.00002, ki=0.000002, kd=0.0, out_min=-0.3, out_max=0.2, integral_max=3.0) self._prev_steam_out = None self.sec_pid = PID(kp=0.0005, ki=0.00005, kd=0.001, out_min=-2.0, out_max=2.0, integral_max=3.0) self.sec_level_target = 25_000.0 self.prim_pump = 50.0 self.mscv = 9.0 self.sec_pump = 40.0 self._pump_hill_cycle = 0 self._pump_hill_dir = +1.0 self._pump_hill_steam = None self.PUMP_SWEEP_THRESHOLD = 5_000.0 self.sweeping = False set_param(f'STEAM_TURBINE_{self.i}_BYPASS_ORDERED', 0.0) self._params = [ f'STEAM_GEN_{self.i}_OUTLET', f'MSCV_{self.i}_OPENING_ACTUAL', f'STEAM_TURBINE_{self.i}_RPM', f'STEAM_TURBINE_{self.i}_BYPASS_ACTUAL', f'GENERATOR_{self.i}_KW', f'COOLANT_SEC_CIRCULATION_PUMP_{self.i}_ORDERED_SPEED', f'COOLANT_SEC_{self.i}_LIQUID_VOLUME', ] def params(self): return self._params def step(self, s, dt): steam_out = s[f'STEAM_GEN_{self.i}_OUTLET'] power_kw = s[f'GENERATOR_{self.i}_KW'] power_error = self.target_kw - power_kw # Dead-band: don't adjust MSCV when within 3% of target (avoid hunting) if abs(power_error) < 0.03 * self.target_kw: mscv_delta = 0.0 self.mscv_pid.reset() else: mscv_delta = self.mscv_pid.step(power_error, dt) steam_rose = (self._prev_steam_out is None or steam_out >= self._prev_steam_out - 1.0) if mscv_delta > 0 and not steam_rose: mscv_delta = 0.0 self._prev_steam_out = steam_out # Cap only prevents opening further — don't force MSCV down as steam fluctuates. mscv_max = max(steam_out / 8.0, 1.0) new_mscv = self.mscv + mscv_delta if mscv_delta > 0: new_mscv = min(new_mscv, mscv_max) self.mscv = float(np.clip(new_mscv, 0.5, 100.0)) set_param(f'MSCV_{self.i}_OPENING_ORDERED', self.mscv) self.sweeping = power_error > self.PUMP_SWEEP_THRESHOLD if self.sweeping: self._pump_hill_cycle += 1 if self._pump_hill_cycle >= 12: self._pump_hill_cycle = 0 if self._pump_hill_steam is not None: if steam_out < self._pump_hill_steam - 0.2: self._pump_hill_dir = -self._pump_hill_dir self._pump_hill_steam = steam_out self.prim_pump = float(np.clip(self.prim_pump + self._pump_hill_dir, 15.0, 90.0)) set_param(f'COOLANT_CORE_CIRCULATION_PUMP_{self.i}_ORDERED_SPEED', self.prim_pump) else: if self.prim_pump != 50.0: self.prim_pump = 50.0 set_param(f'COOLANT_CORE_CIRCULATION_PUMP_{self.i}_ORDERED_SPEED', self.prim_pump) self._pump_hill_cycle = 0 self._pump_hill_steam = None sec_ff = steam_out / 2.0 level = s[f'COOLANT_SEC_{self.i}_LIQUID_VOLUME'] level_error = self.sec_level_target - level sec_corr = self.sec_pid.step(level_error, dt) sec_target = float(np.clip(sec_ff + sec_corr, 5.0, 100.0)) self.sec_pump += 0.3 * (sec_target - self.sec_pump) set_param(f'COOLANT_SEC_CIRCULATION_PUMP_{self.i}_ORDERED_SPEED', self.sec_pump) if s[f'STEAM_TURBINE_{self.i}_BYPASS_ACTUAL'] > 1.0: set_param(f'STEAM_TURBINE_{self.i}_BYPASS_ORDERED', 0.0) return power_kw, power_error, steam_out, level, level_error # --------------------------------------------------------------------------- # Global controller state # --------------------------------------------------------------------------- TEMP_MAX = 410.0 ROD_INTERVAL = 6 ROD_TIERS = [ (3.0, 0.1), (8.0, 0.4), (15.0, 0.8), (float('inf'), 1.2), ] rod_pos = float(nucon.get('ROD_BANK_POS_0_ACTUAL') or 85.0) rod_cycle = 0 rod_integral = 0.0 train_controllers = {t: TrainController(t, targets[t]) for t in args.trains} core_params = [ 'CORE_TEMP', 'ROD_BANK_POS_0_ACTUAL', 'CORE_STATE_CRITICALITY', 'VACUUM_RETENTION_TANK_VOLUME', 'CONDENSER_VOLUME', 'CONDENSER_VAPOR_VOLUME', 'POWER_DEMAND_MW', 'CORE_PRIMARY_CIRCUIT_COOLING_TANK_VOLUME', # pressurizer water volume 'COOLANT_CORE_PRIMARY_LOOP_LEVEL', # overall primary loop fill % 'FREIGHT_PUMP_FEEDWATER_ACTIVE', ] RETENTION_MAX = 40_000.0 RETENTION_HI = 0.75 * RETENTION_MAX RETENTION_MID = 0.50 * RETENTION_MAX # --------------------------------------------------------------------------- # Pressurizer / primary circuit constants # --------------------------------------------------------------------------- PRSR_VALVE = 'Valvula_Pressurizer_Spray' # CORE_PRIMARY_CIRCUIT_COOLING_TANK_VOLUME is the pressurizer water volume. # Observed: 106030 = 60% → max ≈ 176717 PRSR_VOL_MAX = 176_717.0 PRSR_LEVEL_LO = 50.0 # % — open spray valve below this PRSR_LEVEL_CLOSE = 60.0 # % — close spray valve once level recovers PRSR_LEVEL_HI = 70.0 # % — op range high (informational) PRIM_FILL_LO = 80.0 # % — start feedwater pump below this (uses COOLANT_CORE_PRIMARY_LOOP_LEVEL) PRIM_FILL_HI = 90.0 # % — stop feedwater pump above this # Initialise aux state from live game values so restarts are seamless. _init = read_state([ 'VACUUM_RETENTION_TANK_VOLUME', 'STEAM_EJECTOR_CONDENSER_RETURN_VALVE_ACTUAL', 'CONDENSER_VOLUME', 'CONDENSER_VAPOR_VOLUME', 'FREIGHT_PUMP_CONDENSER_ACTIVE', ]) _ret_vol_init = _init.get('VACUUM_RETENTION_TANK_VOLUME', 0.0) _ret_valve_init = _init.get('STEAM_EJECTOR_CONDENSER_RETURN_VALVE_ACTUAL', 0.0) ret_valve = _ret_valve_init ret_draining = (_ret_valve_init > 0.5 and _ret_vol_init > RETENTION_MID) if _ret_valve_init > 0.5 and not ret_draining: set_param('STEAM_EJECTOR_CONDENSER_RETURN_VALVE', 0.0) ret_valve = 0.0 ret_prev_vol = _ret_vol_init _cond_vol_init = _init.get('CONDENSER_VOLUME', 0.0) _cond_vap_init = _init.get('CONDENSER_VAPOR_VOLUME', 0.0) _cond_tot_init = _cond_vol_init + _cond_vap_init _cond_pct_init = (_cond_vol_init / _cond_tot_init * 100.0) if _cond_tot_init > 0 else 0.0 _cond_pump_init = bool(_init.get('FREIGHT_PUMP_CONDENSER_ACTIVE', False)) if _cond_pump_init and _cond_pct_init >= 50.0: nucon.set(nucon._parameters['FREIGHT_PUMP_CONDENSER_SWITCH'], False) cond_pump_on = False elif not _cond_pump_init and _cond_pct_init < 45.0: nucon.set(nucon._parameters['FREIGHT_PUMP_CONDENSER_SWITCH'], True) cond_pump_on = True else: cond_pump_on = _cond_pump_init # Pressurizer spray valve — init from live state _prsr_live = read_state(['CORE_PRIMARY_CIRCUIT_COOLING_TANK_VOLUME', 'COOLANT_CORE_PRIMARY_LOOP_LEVEL', 'FREIGHT_PUMP_FEEDWATER_ACTIVE']) _prsr_level = _prsr_live.get('CORE_PRIMARY_CIRCUIT_COOLING_TANK_VOLUME', PRSR_VOL_MAX * 0.6) / PRSR_VOL_MAX * 100.0 _prsr_valve = nucon.get_valve(PRSR_VALVE) _prsr_open = _prsr_valve.get('IsOpened', False) or _prsr_valve.get('Value', 0) > 50 prsr_spraying = _prsr_open and _prsr_level < PRSR_LEVEL_CLOSE if _prsr_open and not prsr_spraying: nucon.close_valve(PRSR_VALVE) feedwater_on = bool(_prsr_live.get('FREIGHT_PUMP_FEEDWATER_ACTIVE', False)) # --------------------------------------------------------------------------- # TUI helpers # --------------------------------------------------------------------------- def _bar(pct, width=18): pct = max(0.0, min(100.0, pct)) filled = int(pct / 100.0 * width) return '█' * filled + '░' * (width - filled) def _safe_addstr(scr, row, col, text, attr=0): H, W = scr.getmaxyx() if row < 0 or row >= H: return if col < 0: text = text[-col:] col = 0 if col >= W: return text = text[:W - col] try: scr.addstr(row, col, text, attr) except curses.error: pass def _hline(scr, row, char='─'): H, W = scr.getmaxyx() if 0 <= row < H: _safe_addstr(scr, row, 0, char * (W - 1)) # --------------------------------------------------------------------------- # Main TUI loop # --------------------------------------------------------------------------- def run_controller(stdscr): global rod_pos, rod_cycle, rod_integral global ret_valve, ret_draining, ret_prev_vol, cond_pump_on global prsr_spraying, feedwater_on global train_controllers, targets curses.curs_set(0) stdscr.nodelay(True) curses.start_color() curses.use_default_colors() curses.init_pair(1, curses.COLOR_GREEN, -1) # good / normal curses.init_pair(2, curses.COLOR_YELLOW, -1) # warning curses.init_pair(3, curses.COLOR_RED, -1) # alarm curses.init_pair(4, curses.COLOR_CYAN, -1) # selected curses.init_pair(5, curses.COLOR_WHITE, curses.COLOR_BLUE) # title bar GREEN = curses.color_pair(1) YELLOW = curses.color_pair(2) RED = curses.color_pair(3) CYAN = curses.color_pair(4) TITLE = curses.color_pair(5) BOLD = curses.A_BOLD REV = curses.A_REVERSE SELECTION_ORDER = [0, 1, 2, 3, 4] # 0=core 1/2/3=trains 4=grid selected_train = args.trains[0] if args.trains else 1 temp_setpoint = args.temp_setpoint # mutable; adjustable from TUI temp_auto = True # auto-adjust setpoint to meet total power demand grid_follow = args.grid_follow # Per-train power caps: manual max each train should carry (used for proportional distribution) grid_caps = {t: tc.target_kw for t, tc in train_controllers.items()} cycle = 0 train_data = {} # display state — updated each control cycle, read by draw() at any time disp = dict(s={}, dynamic_setpoint=temp_setpoint, temp_auto=temp_auto, criticality=0.0, ret_pct=0.0, ret_draining=False, ret_valve=0.0, cond_pct=0.0, cond_pump_on=False, prsr_level=_prsr_level, prsr_spraying=prsr_spraying, prim_level=_prsr_live.get('COOLANT_CORE_PRIMARY_LOOP_LEVEL', 100.0), feedwater_on=feedwater_on, grid_follow=grid_follow, grid_demand_kw=0.0) def rebuild_all_params(): p = list(core_params) for tc in train_controllers.values(): p += tc.params() return p all_params = rebuild_all_params() def handle_key(key): nonlocal selected_train, all_params, temp_setpoint, temp_auto, grid_follow, grid_caps if key in (ord('q'), 27): return True # signal quit # Direct selection by number elif key in (ord('0'), ord('1'), ord('2'), ord('3')): selected_train = key - ord('0') elif key == ord('g'): selected_train = 4 # Up/Down cycle through selections elif key == curses.KEY_UP: idx = SELECTION_ORDER.index(selected_train) if selected_train in SELECTION_ORDER else 0 selected_train = SELECTION_ORDER[(idx - 1) % len(SELECTION_ORDER)] elif key == curses.KEY_DOWN: idx = SELECTION_ORDER.index(selected_train) if selected_train in SELECTION_ORDER else 0 selected_train = SELECTION_ORDER[(idx + 1) % len(SELECTION_ORDER)] # Right/+ increase Left/- decrease elif key in (ord('+'), ord('='), curses.KEY_RIGHT): if selected_train == 0 and not temp_auto: temp_setpoint = min(round(temp_setpoint / 5.0) * 5.0 + 5.0, 410.0) elif selected_train == 4: args.grid_buffer = min(args.grid_buffer + 1.0, 100.0) elif selected_train in train_controllers: if grid_follow: cur = grid_caps.get(selected_train, train_controllers[selected_train].target_kw) grid_caps[selected_train] = min(round(cur / 5_000) * 5_000 + 5_000, 100_000) # snap+step else: tc = train_controllers[selected_train] tc.target_kw = min(round(tc.target_kw / 5_000) * 5_000 + 5_000, 100_000) targets[selected_train] = tc.target_kw grid_caps[selected_train] = tc.target_kw elif selected_train in (1, 2, 3): targets[selected_train] = 5_000 train_controllers[selected_train] = TrainController(selected_train, 5_000) grid_caps[selected_train] = 5_000 all_params = rebuild_all_params() elif key in (ord('-'), curses.KEY_LEFT): if selected_train == 0 and not temp_auto: temp_setpoint = max(round(temp_setpoint / 5.0) * 5.0 - 5.0, 250.0) elif selected_train == 4: args.grid_buffer = max(args.grid_buffer - 1.0, 0.0) elif selected_train in train_controllers: if grid_follow: cur = grid_caps.get(selected_train, train_controllers[selected_train].target_kw) grid_caps[selected_train] = max(round(cur / 5_000) * 5_000 - 5_000, 0) else: tc = train_controllers[selected_train] tc.target_kw = max(round(tc.target_kw / 5_000) * 5_000 - 5_000, 0) targets[selected_train] = tc.target_kw grid_caps[selected_train] = tc.target_kw elif key == ord('d'): if selected_train == 0: temp_auto = not temp_auto disp['temp_auto'] = temp_auto elif selected_train == 4: grid_follow = not grid_follow disp['grid_follow'] = grid_follow elif selected_train in train_controllers: del train_controllers[selected_train] grid_caps.pop(selected_train, None) if selected_train in targets: del targets[selected_train] all_params = rebuild_all_params() selected_train = 0 if not train_controllers else list(train_controllers.keys())[0] return False def draw(): s = disp['s'] dynamic_setpoint = disp['dynamic_setpoint'] criticality = disp['criticality'] ret_pct = disp['ret_pct'] ret_draining = disp['ret_draining'] ret_valve = disp['ret_valve'] cond_pct = disp['cond_pct'] cond_pump_on = disp['cond_pump_on'] if not s: return stdscr.erase() H, W = stdscr.getmaxyx() row = 0 title = f" NUCLEARES CONTROLLER ─ Cycle {cycle:5d} ─ dt={args.dt:.0f}s " _safe_addstr(stdscr, row, 0, title.ljust(W - 1), TITLE | BOLD) row += 1 _hline(stdscr, row); row += 1 core_sel = (selected_train == 0) core_attr = CYAN | BOLD if core_sel else BOLD temp_auto_ = disp['temp_auto'] core_temp = s.get('CORE_TEMP', 0.0) temp_color = RED if core_temp > 370 else YELLOW if core_temp > 355 else GREEN scram_str = ' !! SCRAM !!' if core_temp > TEMP_MAX else '' auto_str = 'AUTO' if temp_auto_ else 'MAN ' auto_color = GREEN if temp_auto_ else YELLOW _safe_addstr(stdscr, row, 2, '◆ CORE' + (' ◀' if core_sel else ''), core_attr) _safe_addstr(stdscr, row, 10, f'[{auto_str}]', auto_color | BOLD) _safe_addstr(stdscr, row, 16, 'Temp: ', BOLD) _safe_addstr(stdscr, row, 22, f'{core_temp:6.1f}°C', temp_color | BOLD) sp_color = RED if dynamic_setpoint < 306 or dynamic_setpoint > 360 else 0 _safe_addstr(stdscr, row, 32, f'sp=', 0) _safe_addstr(stdscr, row, 35, f'{dynamic_setpoint:.0f}°C', sp_color | BOLD) _safe_addstr(stdscr, row, 40, f' Rod: {s.get("ROD_BANK_POS_0_ACTUAL", 0):5.1f} ' f'Crit: {criticality:+.3f}{scram_str}') row += 1 for t in (1, 2, 3): _hline(stdscr, row); row += 1 is_sel = (t == selected_train) is_active = (t in train_controllers) tc = train_controllers.get(t) sel_attr = CYAN | BOLD if is_sel else 0 label = f'◆ TRAIN {t}' + (' ◀' if is_sel else '') _safe_addstr(stdscr, row, 2, label, sel_attr | BOLD) if is_active and t in train_data: power_kw, power_error, steam_out, level, level_error = train_data[t] pwr_pct = power_kw / tc.target_kw * 100.0 if tc.target_kw > 0 else 0.0 pwr_color = GREEN if abs(power_error) < 2000 else YELLOW if abs(power_error) < 8000 else RED cap = grid_caps.get(t, tc.target_kw) gf = disp['grid_follow'] tgt_str = (f'tgt={tc.target_kw/1000:.1f}/{cap/1000:.0f}MW' if gf and abs(tc.target_kw - cap) > 500 else f'tgt={tc.target_kw/1000:.0f}MW') _safe_addstr(stdscr, row, 16, 'Power: ', BOLD) _safe_addstr(stdscr, row, 23, f'{power_kw/1000:5.1f} MW', pwr_color | BOLD) _safe_addstr(stdscr, row, 32, f'[{_bar(pwr_pct, 14)}] {power_error/1000:+5.1f}MW {tgt_str}') row += 1 sweep_ind = '~' if tc.sweeping else ' ' _safe_addstr(stdscr, row, 16, f'Steam: {steam_out:5.1f} MSCV: {tc.mscv:4.1f} ' f'Prim: {tc.prim_pump:3.0f}%{sweep_ind} Sec: {tc.sec_pump:3.0f}% ' f'Lvl: {level:.0f} (Δ{level_error:+.0f})') elif not is_active: hint = ' (+/Up to add)' if is_sel else '' _safe_addstr(stdscr, row, 16, f'not controlled{hint}', YELLOW if is_sel else 0) row += 1 _hline(stdscr, row); row += 1 gf = disp['grid_follow'] gdkw = disp['grid_demand_kw'] total_cap = sum(grid_caps.get(t, tc.target_kw) for t, tc in train_controllers.items()) grid_sel = (selected_train == 4) grid_attr = CYAN | BOLD if grid_sel else BOLD gf_color = GREEN | BOLD if gf else (CYAN | BOLD if grid_sel else 0) _safe_addstr(stdscr, row, 2, '◆ GRID' + (' ◀' if grid_sel else ''), grid_attr) _safe_addstr(stdscr, row, 16, f'Demand: {gdkw/1000:5.1f} MW', BOLD) if gf: target_total = gdkw + args.grid_buffer * 1000.0 _safe_addstr(stdscr, row, 34, f' AUTO buf={args.grid_buffer:.0f}MW ' f'→{target_total/1000:.1f}/{total_cap/1000:.0f}MW total', gf_color) else: _safe_addstr(stdscr, row, 34, f' off buf={args.grid_buffer:.0f}MW cap={total_cap/1000:.0f}MW', gf_color) row += 1 _hline(stdscr, row); row += 1 ret_color = RED if ret_pct > 75 else YELLOW if ret_pct > 60 else GREEN _safe_addstr(stdscr, row, 2, '◆ RETENTION TANK ', BOLD) _safe_addstr(stdscr, row, 20, f'[{_bar(ret_pct, 20)}]', ret_color) _safe_addstr(stdscr, row, 43, f' {ret_pct:4.0f}%') _safe_addstr(stdscr, row, 49, f' DRAINING valve={ret_valve:.0f}%' if ret_draining else ' OK', YELLOW if ret_draining else GREEN) row += 1 cond_color = RED if cond_pct < 25 else YELLOW if cond_pct < 40 else GREEN _safe_addstr(stdscr, row, 2, '◆ CONDENSER FILL ', BOLD) _safe_addstr(stdscr, row, 20, f'[{_bar(cond_pct, 20)}]', cond_color) _safe_addstr(stdscr, row, 43, f' {cond_pct:4.0f}%') _safe_addstr(stdscr, row, 49, ' PUMP ON' if cond_pump_on else ' OK', YELLOW if cond_pump_on else GREEN) row += 1 prsr_level_ = disp['prsr_level'] prsr_spray_ = disp['prsr_spraying'] feedwater_ = disp['feedwater_on'] prsr_color = RED if prsr_level_ < 40 or prsr_level_ > 80 else YELLOW if prsr_level_ < PRSR_LEVEL_LO or prsr_level_ > PRSR_LEVEL_HI else GREEN _safe_addstr(stdscr, row, 2, '◆ PRESSURIZER ', BOLD) _safe_addstr(stdscr, row, 20, f'[{_bar(prsr_level_, 20)}]', prsr_color) _safe_addstr(stdscr, row, 43, f' {prsr_level_:4.1f}%') _safe_addstr(stdscr, row, 49, ' SPRAY ON' if prsr_spray_ else ' OK', YELLOW if prsr_spray_ else GREEN) row += 1 prim_level_ = disp.get('prim_level', 100.0) prim_color = RED if prim_level_ < 70 else YELLOW if prim_level_ < PRIM_FILL_LO else GREEN _safe_addstr(stdscr, row, 2, '◆ PRIMARY VESSEL ', BOLD) _safe_addstr(stdscr, row, 20, f'[{_bar(prim_level_, 20)}]', prim_color) _safe_addstr(stdscr, row, 43, f' {prim_level_:4.1f}%') _safe_addstr(stdscr, row, 49, ' FW PUMP ON' if feedwater_ else ' OK', YELLOW if feedwater_ else GREEN) row += 1 if selected_train == 0: adj_hint = f'←/→ sp {disp["dynamic_setpoint"]:.0f}°C±5' if not disp['temp_auto'] else f'sp={disp["dynamic_setpoint"]:.0f}°C (auto)' d_hint = f' [d] auto {"OFF" if disp["temp_auto"] else "ON"}' elif selected_train == 4: adj_hint = f'←/→ buf {args.grid_buffer:.0f}MW±1' d_hint = ' [d] toggle auto' elif disp['grid_follow']: cap = grid_caps.get(selected_train, 0) adj_hint = f'←/→ max {cap/1000:.0f}MW±5' d_hint = ' [d] remove' else: adj_hint = '←/→ target ±5MW' d_hint = ' [d] remove' _safe_addstr(stdscr, H - 1, 0, f' [↑↓] select [0-3/g] jump {adj_hint}{d_hint} [q] quit '.ljust(W - 1), REV) stdscr.refresh() while True: t0 = time.time() s = read_state(all_params) cycle += 1 # ---- Rod control ---- temp_error = s['CORE_TEMP'] - temp_setpoint criticality = s.get('CORE_STATE_CRITICALITY', 0.0) rod_cycle += 1 if s['CORE_TEMP'] > TEMP_MAX: rod_pos = 100.0 for tc in train_controllers.values(): tc.prim_pump = 90.0 set_param(f'COOLANT_CORE_CIRCULATION_PUMP_{tc.i}_ORDERED_SPEED', 90.0) else: urgent = temp_error > 5.0 or criticality > 0.3 if urgent or rod_cycle >= ROD_INTERVAL: if rod_cycle >= ROD_INTERVAL: rod_cycle = 0 abs_err = abs(temp_error) max_step = next(lim for thresh, lim in ROD_TIERS if abs_err <= thresh) if urgent and rod_cycle != 0: max_step = min(max_step, 0.25) if not urgent: rod_integral = float(np.clip(rod_integral + 0.002 * temp_error, -3.0, 3.0)) else: rod_integral *= 0.5 raw_delta = 0.04 * temp_error + 1.0 * criticality + rod_integral rod_delta = float(np.clip(raw_delta, -max_step, max_step)) rod_pos = float(np.clip(s['ROD_BANK_POS_0_ACTUAL'] + rod_delta, 0.0, 100.0)) set_param('ROD_BANK_POS_0_ORDERED', rod_pos) # ---- Grid-demand following ---- grid_demand_kw = s.get('POWER_DEMAND_MW', 0.0) * 1000.0 if grid_follow and train_controllers: total_target_kw = grid_demand_kw + args.grid_buffer * 1000.0 # Distribute proportionally to each train's manual cap; never exceed cap total_cap = sum(grid_caps.get(t, tc.target_kw) for t, tc in train_controllers.items()) if total_cap > 0: for t, tc in train_controllers.items(): cap = grid_caps.get(t, tc.target_kw) share = total_target_kw * (cap / total_cap) tc.target_kw = float(np.clip(share, 0.0, cap)) # ---- Per-train control ---- for t, tc in train_controllers.items(): res = tc.step(s, args.dt) train_data[t] = res # ---- Auto temp setpoint ---- if temp_auto and train_data: total_error = sum(train_data[t][1] for t in train_data) # sum of power_errors sp_delta = float(np.clip(total_error * 0.00002, -0.5, 0.5)) temp_setpoint = float(np.clip(temp_setpoint + sp_delta, 306.0, 360.0)) # ---- Aux: retention tank ---- ret_vol = s.get('VACUUM_RETENTION_TANK_VOLUME', 0.0) ret_pct = ret_vol / RETENTION_MAX * 100.0 if ret_draining and ret_vol <= RETENTION_MID: ret_draining = False ret_valve = 0.0 set_param('STEAM_EJECTOR_CONDENSER_RETURN_VALVE', 0.0) elif ret_vol > RETENTION_HI: ret_draining = True if ret_prev_vol is not None and ret_vol >= ret_prev_vol - 50.0: ret_valve = min(ret_valve + 1.0, 50.0) set_param('STEAM_EJECTOR_CONDENSER_RETURN_VALVE', ret_valve) elif ret_draining: set_param('STEAM_EJECTOR_CONDENSER_RETURN_VALVE', ret_valve) ret_prev_vol = ret_vol # ---- Aux: condenser fill ---- cond_vol = s.get('CONDENSER_VOLUME', 0.0) cond_vap = s.get('CONDENSER_VAPOR_VOLUME', 0.0) cond_tot = cond_vol + cond_vap cond_pct = (cond_vol / cond_tot * 100.0) if cond_tot > 0 else 0.0 if not cond_pump_on and cond_pct < 45.0: cond_pump_on = True nucon.set(nucon._parameters['FREIGHT_PUMP_CONDENSER_SWITCH'], True) elif cond_pump_on and cond_pct >= 50.0: cond_pump_on = False nucon.set(nucon._parameters['FREIGHT_PUMP_CONDENSER_SWITCH'], False) # ---- Aux: pressurizer spray valve (level 50-70%) ---- prsr_level = s.get('CORE_PRIMARY_CIRCUIT_COOLING_TANK_VOLUME', PRSR_VOL_MAX * 0.6) / PRSR_VOL_MAX * 100.0 if not prsr_spraying and prsr_level < PRSR_LEVEL_LO: prsr_spraying = True nucon.open_valve(PRSR_VALVE) elif prsr_spraying and prsr_level >= PRSR_LEVEL_CLOSE: prsr_spraying = False nucon.close_valve(PRSR_VALVE) elif not prsr_spraying: # Valve should be at rest — power off actuator if it's reached closed position _vs = nucon.get_valve(PRSR_VALVE) if _vs.get('IsClosed') and _vs.get('Actuator') != 'OFF': nucon.off_valve(PRSR_VALVE) # ---- Aux: primary circuit feedwater (overall loop fill > 80%) ---- prim_level = s.get('COOLANT_CORE_PRIMARY_LOOP_LEVEL', 100.0) if not feedwater_on and prim_level < PRIM_FILL_LO: feedwater_on = True nucon.set(nucon._parameters['FREIGHT_PUMP_FEEDWATER_SWITCH'], True) elif feedwater_on and prim_level >= PRIM_FILL_HI: feedwater_on = False nucon.set(nucon._parameters['FREIGHT_PUMP_FEEDWATER_SWITCH'], False) # ---- Update display state and redraw ---- disp.update(s=s, dynamic_setpoint=temp_setpoint, temp_auto=temp_auto, criticality=criticality, ret_pct=ret_pct, ret_draining=ret_draining, ret_valve=ret_valve, cond_pct=cond_pct, cond_pump_on=cond_pump_on, prsr_level=prsr_level, prsr_spraying=prsr_spraying, prim_level=prim_level, feedwater_on=feedwater_on, grid_follow=grid_follow, grid_demand_kw=grid_demand_kw) draw() # ---- Poll input + redraw at 50 ms intervals for the rest of the cycle ---- sim_speed = nucon.GAME_SIM_SPEED.value or 1.0 deadline = t0 + args.dt / sim_speed stdscr.timeout(50) while time.time() < deadline: key = stdscr.getch() if key == -1: continue if handle_key(key): return disp['dynamic_setpoint'] = temp_setpoint disp['temp_auto'] = temp_auto disp['grid_follow'] = grid_follow draw() stdscr.timeout(-1) curses.wrapper(run_controller)