Compare commits

..

No commits in common. "e665a457dc9ba8c3642f9c9788441f65c028f194" and "b0a2ac75749624bc3246d5fd4205089973c09cc4" have entirely different histories.

7 changed files with 280 additions and 885 deletions

View File

@ -1 +1 @@
from nucon.core import *
from nucon.core import *

View File

@ -1,20 +1,28 @@
from enum import Enum
from typing import Union, Dict, Type, List, Optional, Any, Iterator, Tuple
from enum import Enum, IntEnum
import requests
import random
from typing import Union, Dict, Type, List, Optional, Any
class NuconConfig:
base_url = "http://localhost:8785/"
dummy_mode = False
class ParameterEnum(Enum):
@classmethod
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
if isinstance(value, str):
if value.lower() == 'true':
# Handle boolean-like strings
if value.lower() in ('true'):
return cls(True)
elif value.lower() == 'false':
elif value.lower() in ('false'):
return cls(False)
# Try to convert to int for int-based enums
try:
return cls(int(value))
except ValueError:
pass
# If we can't handle the value, let the default Enum behavior take over
return None
class PumpStatus(ParameterEnum):
@ -53,208 +61,214 @@ CoreState = str
CoolantCoreState = str
RodsState = str
class NuconParameter:
def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
self.nucon = nucon
#class CoreState(ParameterEnum):
# REACTIVE = 'REACTIVO'
#
# def __bool__(self):
# return self.value == 'REACTIVO'
#class CoolantCoreState(ParameterEnum):
# CIRCULATING = 'CIRCULANDO'
#
# def __bool__(self):
# return self.value == 'CIRCULANDO'
class Nucon(Enum):
# Core
CORE_TEMP = ("CORE_TEMP", float, False)
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False)
CORE_PRESSURE = ("CORE_PRESSURE", float, False)
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
CORE_WEAR = ("CORE_WEAR", float, False)
CORE_STATE = ("CORE_STATE", CoreState, False)
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
CORE_IMMINENT_FUSION = ("CORE_IMMINENT_FUSION", bool, False)
CORE_READY_FOR_START = ("CORE_READY_FOR_START", bool, False)
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
# Time
TIME = ("TIME", str, False)
TIME_STAMP = ("TIME_STAMP", str, False)
# Coolant Core
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False)
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False)
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False)
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
# Circulation Pumps
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
# Rods
RODS_STATUS = ("RODS_STATUS", RodsState, False)
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
RODS_TEMPERATURE = ("RODS_TEMPERATURE", float, False)
RODS_MAX_TEMPERATURE = ("RODS_MAX_TEMPERATURE", float, False)
RODS_POS_ORDERED = ("RODS_POS_ORDERED", float, True)
RODS_POS_ACTUAL = ("RODS_POS_ACTUAL", float, False)
RODS_POS_REACHED = ("RODS_POS_REACHED", bool, False)
RODS_QUANTITY = ("RODS_QUANTITY", int, False)
RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
# Generators
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
GENERATOR_0_V = ("GENERATOR_0_V", float, False)
GENERATOR_1_V = ("GENERATOR_1_V", float, False)
GENERATOR_2_V = ("GENERATOR_2_V", float, False)
GENERATOR_0_A = ("GENERATOR_0_A", float, False)
GENERATOR_1_A = ("GENERATOR_1_A", float, False)
GENERATOR_2_A = ("GENERATOR_2_A", float, False)
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False)
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False)
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False)
# Steam Turbines
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
STEAM_TURBINE_0_TEMPERATURE = ("STEAM_TURBINE_0_TEMPERATURE", float, False)
STEAM_TURBINE_1_TEMPERATURE = ("STEAM_TURBINE_1_TEMPERATURE", float, False)
STEAM_TURBINE_2_TEMPERATURE = ("STEAM_TURBINE_2_TEMPERATURE", float, False)
STEAM_TURBINE_0_PRESSURE = ("STEAM_TURBINE_0_PRESSURE", float, False)
STEAM_TURBINE_1_PRESSURE = ("STEAM_TURBINE_1_PRESSURE", float, False)
STEAM_TURBINE_2_PRESSURE = ("STEAM_TURBINE_2_PRESSURE", float, False)
def __init__(self, id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
self.id = id
self.param_type = param_type
self.is_writable = is_writable
self.min_val = min_val
self.max_val = max_val
@property
def enum_type(self) -> Type[Enum]:
return self.param_type if issubclass(self.param_type, Enum) else None
def check_in_range(self, value: Union[int, float, Enum], raise_on_oob: bool = False) -> bool:
if self.enum_type:
if not isinstance(value, self.enum_type):
if raise_on_oob:
raise ValueError(f"Value {value} is not a valid {self.enum_type.__name__}")
return False
return True
if self.min_val is not None and value < self.min_val:
if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val}")
return False
if self.max_val is not None and value > self.max_val:
if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val}")
return False
return True
@property
def value(self):
return self.nucon.get(self)
@value.setter
def value(self, new_value):
self.nucon.set(self, new_value)
def read(self):
return self.value
def write(self, new_value, force=False):
self.nucon.set(self, new_value, force)
def __repr__(self):
return f"NuconParameter(id='{self.id}', value={self.value}, type={self.param_type.__name__}, writable={self.is_writable})"
def __repr__(self) -> str:
type_repr = repr(self.param_type)
return (f"<{self.__class__.__name__}.{self.name}: "
f"{{'value': {repr(self.value)}, 'type': {type_repr}, "
f"'writable': {self.is_writable}}}>"
)
def __str__(self):
return self.id
class Nucon:
def __init__(self, host: str = 'localhost', port: int = 8786):
self.base_url = f'http://{host}:{port}/'
self.dummy_mode = False
self._parameters = self._create_parameters()
@property
def enum_type(self) -> Type[Enum]:
return self.param_type if issubclass(self.param_type, Enum) else None
def _create_parameters(self) -> Dict[str, NuconParameter]:
param_values = {
'CORE_TEMP': (float, False, 0, 1000),
'CORE_TEMP_OPERATIVE': (float, False),
'CORE_TEMP_MAX': (float, False),
'CORE_TEMP_MIN': (float, False),
'CORE_TEMP_RESIDUAL': (bool, False),
'CORE_PRESSURE': (float, False),
'CORE_PRESSURE_MAX': (float, False),
'CORE_PRESSURE_OPERATIVE': (float, False),
'CORE_INTEGRITY': (float, False),
'CORE_WEAR': (float, False),
'CORE_STATE': (CoreState, False),
'CORE_STATE_CRITICALITY': (float, False),
'CORE_CRITICAL_MASS_REACHED': (bool, False),
'CORE_CRITICAL_MASS_REACHED_COUNTER': (int, False),
'CORE_IMMINENT_FUSION': (bool, False),
'CORE_READY_FOR_START': (bool, False),
'CORE_STEAM_PRESENT': (bool, False),
'CORE_HIGH_STEAM_PRESENT': (bool, False),
'TIME': (str, False),
'TIME_STAMP': (str, False),
'COOLANT_CORE_STATE': (CoolantCoreState, False),
'COOLANT_CORE_PRESSURE': (float, False),
'COOLANT_CORE_MAX_PRESSURE': (float, False),
'COOLANT_CORE_VESSEL_TEMPERATURE': (float, False),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (float, False),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (float, False),
'COOLANT_CORE_FLOW_SPEED': (float, False),
'COOLANT_CORE_FLOW_ORDERED_SPEED': (float, False),
'COOLANT_CORE_FLOW_REACHED_SPEED': (bool, False),
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (float, False),
'RODS_STATUS': (RodsState, False),
'RODS_MOVEMENT_SPEED': (float, False),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': (bool, False),
'RODS_DEFORMED': (bool, False),
'RODS_TEMPERATURE': (float, False),
'RODS_MAX_TEMPERATURE': (float, False),
'RODS_POS_ORDERED': (float, True),
'RODS_POS_ACTUAL': (float, False),
'RODS_POS_REACHED': (bool, False),
'RODS_QUANTITY': (int, False),
'RODS_ALIGNED': (bool, False),
'GENERATOR_0_KW': (float, False),
'GENERATOR_1_KW': (float, False),
'GENERATOR_2_KW': (float, False),
'GENERATOR_0_V': (float, False),
'GENERATOR_1_V': (float, False),
'GENERATOR_2_V': (float, False),
'GENERATOR_0_A': (float, False),
'GENERATOR_1_A': (float, False),
'GENERATOR_2_A': (float, False),
'GENERATOR_0_HERTZ': (float, False),
'GENERATOR_1_HERTZ': (float, False),
'GENERATOR_2_HERTZ': (float, False),
'GENERATOR_0_BREAKER': (BreakerStatus, False),
'GENERATOR_1_BREAKER': (BreakerStatus, False),
'GENERATOR_2_BREAKER': (BreakerStatus, False),
'STEAM_TURBINE_0_RPM': (float, False),
'STEAM_TURBINE_1_RPM': (float, False),
'STEAM_TURBINE_2_RPM': (float, False),
'STEAM_TURBINE_0_TEMPERATURE': (float, False),
'STEAM_TURBINE_1_TEMPERATURE': (float, False),
'STEAM_TURBINE_2_TEMPERATURE': (float, False),
'STEAM_TURBINE_0_PRESSURE': (float, False),
'STEAM_TURBINE_1_PRESSURE': (float, False),
'STEAM_TURBINE_2_PRESSURE': (float, False),
}
@property
def value(self) -> Union[float, int, bool, str, Enum]:
return self.read()
return {
name: NuconParameter(self, name, *values)
for name, values in param_values.items()
}
def read(self) -> Union[float, int, bool, str, Enum]:
return Nucon.get(self)
def get(self, parameter: Union[str, NuconParameter]) -> Union[float, int, bool, str, Enum]:
@value.setter
def value(self, new_value: Union[float, int, bool, str, Enum]) -> None:
self.write(new_value)
def write(self, new_value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
Nucon.set(self, new_value, force)
def check_in_range(self, value: Union[int, float], raise_on_oob=False) -> None:
if self.min_val is not None and value < self.min_val:
if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val} for {self.name}")
return False
if self.max_val is not None and value > self.max_val:
if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val} for {self.name}")
return False
return True
@classmethod
def set_base_url(cls, url: str) -> None:
NuconConfig.base_url = url
@classmethod
def set_dummy_mode(cls, dummy_mode: bool) -> None:
NuconConfig.dummy_mode = dummy_mode
@classmethod
def get_all_readable(cls) -> List['Nucon']:
return list(cls) # All parameters are readable
@classmethod
def get_all_writable(cls) -> List['Nucon']:
return [param for param in cls if param.is_writable]
@classmethod
def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]:
if isinstance(parameter, str):
parameter = self._parameters[parameter]
if self.dummy_mode:
return self._get_dummy_value(parameter)
parameter = cls[parameter]
value = self._query(parameter)
if parameter.enum_type:
if NuconConfig.dummy_mode:
return cls._get_dummy_value(parameter)
value = cls._query(parameter.name)
if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum):
return parameter.enum_type(value)
elif parameter.param_type == float:
return float(value)
elif parameter.param_type == int:
return int(value)
elif parameter.param_type == bool:
if isinstance(value, str):
if value.lower() not in ('true', 'false'):
raise ValueError(f"Invalid boolean value: {value}")
return value.lower() == 'true'
if value.lower() in ('true'):
return True
elif value.lower() in ('false'):
return False
else:
raise ValueError(f"Expected string for boolean parameter, got {type(value)}")
raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).")
else:
return parameter.param_type(value)
return value
def set(self, parameter: Union[str, NuconParameter], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
if isinstance(parameter, str):
parameter = self._parameters[parameter]
if not force and not parameter.is_writable:
raise ValueError(f"Parameter {parameter} is not writable")
if not force:
parameter.check_in_range(value, raise_on_oob=True)
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
if self.dummy_mode:
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter} to {value}")
return
self._set_value(parameter, str(value))
def _query(self, parameter: NuconParameter) -> str:
response = requests.get(self.base_url, params={"variable": parameter.id})
@classmethod
def _query(cls, parameter_name: str) -> str:
response = requests.get(NuconConfig.base_url, params={"variable": parameter_name})
if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter.id}. Status code: {response.status_code}")
raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}")
return response.text.strip()
def _set_value(self, parameter: NuconParameter, value: str) -> None:
response = requests.post(self.base_url, params={"variable": parameter.id, "value": value})
if response.status_code != 200:
raise Exception(f"Failed to set parameter {parameter.id}. Status code: {response.status_code}")
def _get_dummy_value(self, parameter: NuconParameter) -> Union[float, int, bool, str, Enum]:
@classmethod
def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
if parameter.enum_type:
return next(iter(parameter.enum_type))
elif parameter.param_type == float:
@ -264,52 +278,69 @@ class Nucon:
return 3.14
elif parameter.param_type == int:
if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) // 2 + parameter.min_val
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else:
return 42
elif parameter.param_type == bool:
return random.choice([True, False])
return False
else:
return "dummy"
def get_multiple_iter(self, parameters: List[Union[str, NuconParameter]]) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
for param in parameters:
if isinstance(param, str):
param_name = param
param_obj = self._parameters[param]
else:
param_name = next(name for name, p in self._parameters.items() if p is param)
param_obj = param
yield param_name, self.get(param_obj)
@classmethod
def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return {param: cls.get(param) for param in parameters}
def get_multiple(self, parameters: List[Union[str, NuconParameter]]) -> Dict[str, Union[float, int, bool, str, Enum]]:
return dict(self.get_multiple_iter(parameters))
@classmethod
def get_all(cls) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return cls.get_multiple(list(cls))
def get_all_iter(self) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
return self.get_multiple_iter(self._parameters.keys())
@classmethod
def set(cls, parameter: Union['Nucon', str], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
if isinstance(parameter, str):
parameter = next((param for param in cls if param.id == parameter), None)
if parameter is None:
raise ValueError(f"No parameter found with id '{parameter}'")
def get_all(self) -> Dict[str, Union[float, int, bool, str, Enum]]:
return dict(self.get_all_iter())
if not force and not parameter.is_writable:
raise ValueError(f"Parameter {parameter.name} is not writable")
def get_all_readable(self) -> List[NuconParameter]:
return {name: param for name, param in self._parameters.items()}
if not force:
parameter.check_in_range(value, raise_on_oob=True)
def get_all_writable(self) -> List[NuconParameter]:
return {name: param for name, param in self._parameters.items() if param.is_writable}
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
def set_dummy_mode(self, dummy_mode: bool) -> None:
self.dummy_mode = dummy_mode
if NuconConfig.dummy_mode:
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter.name} to {value}")
return
def __getattr__(self, name):
if name in self._parameters:
return self._parameters[name]
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
response = requests.post(NuconConfig.base_url, params={"variable": parameter.name, "value": str(value)})
if response.status_code != 200:
raise Exception(f"Failed to set parameter {parameter.name}. Status code: {response.status_code}")
def __getitem__(self, key):
return self.__getattr__(key)
# Example usage
if __name__ == "__main__":
# Enable dummy mode for testing
Nucon.set_dummy_mode(True)
def __dir__(self):
return list(super().__dir__()) + list(self._parameters.keys())
# Get a single parameter
core_temp = Nucon.CORE_TEMP.value
print(f"Core Temperature: {core_temp}")
def __len__(self):
return len(self._parameters)
# Get a parameter with an enum
pump_status = Nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
print(f"Pump 0 Status: {pump_status}")
# Set a parameter with an enum
try:
Nucon.GENERATOR_0_BREAKER.value = BreakerStatus.OPEN
print(f"Successfully set GENERATOR_0_BREAKER to {Nucon.GENERATOR_0_BREAKER.value}")
except ValueError as e:
print(f"Error: {e}")
# Get all parameters
all_params = Nucon.get_all()
print("All parameters:")
for param, value in all_params.items():
print(f"{param.name}: {value}")

View File

@ -1,174 +0,0 @@
import numpy as np
import time
import torch
import torch.nn as nn
import torch.optim as optim
import random
from enum import Enum
from nucon import Nucon
import pickle
import os
from typing import Union, Tuple, List
Actors = {
'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()},
'null': lambda nucon: lambda obs: {},
}
class ReactorDynamicsNet(nn.Module):
def __init__(self, input_dim, output_dim):
super(ReactorDynamicsNet, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim + 1, 128), # +1 for time_delta
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, state, time_delta):
x = torch.cat([state, time_delta], dim=-1)
return self.network(x)
class ReactorDynamicsModel(nn.Module):
def __init__(self, input_params: List[str], output_params: List[str]):
super(ReactorDynamicsModel, self).__init__()
self.input_params = input_params
self.output_params = output_params
input_dim = len(input_params)
output_dim = len(output_params)
self.net = ReactorDynamicsNet(input_dim, output_dim)
def _state_dict_to_tensor(self, state_dict):
return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32)
def _tensor_to_state_dict(self, tensor):
return {p: tensor[i].item() for i, p in enumerate(self.output_params)}
def forward(self, state_dict, time_delta):
state_tensor = self._state_dict_to_tensor(state_dict).unsqueeze(0)
time_delta_tensor = torch.tensor([time_delta], dtype=torch.float32).unsqueeze(0)
predicted_tensor = self.net(state_tensor, time_delta_tensor)
return self._tensor_to_state_dict(predicted_tensor.squeeze(0))
class NuconModelLearner:
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl', time_delta: Union[float, Tuple[float, float]] = 0.1):
self.nucon = Nucon() if nucon is None else nucon
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
self.dataset = self.load_dataset(dataset_path) or []
self.dataset_path = dataset_path
self.readable_params = list(self.nucon.get_all_readable().keys())
self.non_writable_params = [param.id for param in self.nucon.get_all_readable().values() if not param.is_writable]
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.optimizer = optim.Adam(self.model.parameters())
if isinstance(time_delta, (int, float)):
self.time_delta = lambda: time_delta
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
self.time_delta = lambda: random.uniform(*time_delta)
else:
raise ValueError("time_delta must be a float or a tuple of two floats")
def _get_state(self):
state = {}
for param_id, param in self.nucon.get_all_readable().items():
value = self.nucon.get(param)
if isinstance(value, Enum):
value = value.value
state[param_id] = value
return state
def collect_data(self, num_steps):
state = self._get_state()
for _ in range(num_steps):
action = self.actor(state)
start_time = time.time()
for param_id, value in action.items():
self.nucon.set(param_id, value)
time_delta = self.time_delta()
time.sleep(time_delta)
next_state = self._get_state()
self.dataset.append((state, action, next_state, time_delta))
state = next_state
self.save_dataset()
def refine_dataset(self, error_threshold):
refined_data = []
for state, action, next_state, time_delta in self.dataset:
predicted_next_state = self.model(state, time_delta)
error = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
if error > error_threshold:
refined_data.append((state, action, next_state, time_delta))
self.dataset = refined_data
self.save_dataset()
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
random.shuffle(self.dataset)
split_idx = int(len(self.dataset) * (1 - test_split))
train_data = self.dataset[:split_idx]
test_data = self.dataset[split_idx:]
for epoch in range(num_epochs):
train_loss = self._train_epoch(train_data, batch_size)
test_loss = self._test_epoch(test_data)
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
def _train_epoch(self, data, batch_size):
total_loss = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
states, _, next_states, time_deltas = zip(*batch)
loss = 0
for state, next_state, time_delta in zip(states, next_states, time_deltas):
predicted_next_state = self.model(state, time_delta)
loss += sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
loss /= len(batch)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / (len(data) // batch_size)
def _test_epoch(self, data):
total_loss = 0
with torch.no_grad():
for state, _, next_state, time_delta in data:
predicted_next_state = self.model(state, time_delta)
loss = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
total_loss += loss
return total_loss / len(data)
def save_model(self, path):
torch.save(self.model.state_dict(), path)
def load_model(self, path):
self.model.load_state_dict(torch.load(path))
def save_dataset(self, path=None):
path = path or self.dataset_path
with open(path, 'wb') as f:
pickle.dump(self.dataset, f)
def load_dataset(self, path=None):
path = path or self.dataset_path
if os.path.exists(path):
with open(path, 'rb') as f:
return pickle.load(f)
return None
def merge_datasets(self, other_dataset_path):
other_dataset = self.load_dataset(other_dataset_path)
if other_dataset:
self.dataset.extend(other_dataset)
self.save_dataset()

View File

@ -3,7 +3,7 @@ from gymnasium import spaces
import numpy as np
import time
from typing import Dict, Any
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
from .core import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
Objectives = {
"null": lambda obs: 0,
@ -13,16 +13,12 @@ Objectives = {
Parameterized_Objectives = {
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
}
class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
def __init__(self, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__()
self.render_mode = render_mode
@ -30,30 +26,22 @@ class NuconEnv(gym.Env):
if objective_weights is None:
objective_weights = [1.0 for objective in objectives]
self.objective_weights = objective_weights
self.terminate_above = terminate_above
self.simulator = simulator
if nucon is None:
if simulator:
nucon = Nucon(port=simulator.port)
else:
nucon = Nucon()
self.nucon = nucon
self.terminate_at = terminate_at
# Define observation space
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param_id, param in self.nucon.get_all_readable().items():
for param in Nucon.get_all_readable():
if param.param_type == float:
obs_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
obs_spaces[param.id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int:
if param.min_val is not None and param.max_val is not None:
obs_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
obs_spaces[param.id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else:
obs_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
obs_spaces[param.id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool:
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
obs_spaces[param.id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum):
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
obs_spaces[param.id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else:
raise ValueError(f"Unsupported observation parameter type: {param.param_type}")
@ -61,26 +49,23 @@ class NuconEnv(gym.Env):
# Define action space
action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items():
for param in Nucon.get_all_writable():
if param.param_type == float:
action_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
action_spaces[param.id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int:
if param.min_val is not None and param.max_val is not None:
action_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
action_spaces[param.id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else:
action_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
action_spaces[param.id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool:
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
action_spaces[param.id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum):
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
action_spaces[param.id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else:
raise ValueError(f"Unsupported action parameter type: {param.param_type}")
self.action_space = spaces.Dict(action_spaces)
self.objectives = []
self.terminators = []
for objective in objectives:
if objective in Objectives:
self.objectives.append(Objectives[objective])
@ -99,11 +84,11 @@ class NuconEnv(gym.Env):
def _get_obs(self):
obs = {}
for param_id, param in self.nucon.get_all_readable().items():
value = self.nucon.get(param_id)
for param in Nucon.get_all_readable():
value = Nucon.get(param)
if isinstance(value, Enum):
value = value.value
obs[param_id] = value
obs[param.id] = value
obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step
return obs
@ -127,12 +112,12 @@ class NuconEnv(gym.Env):
def step(self, action):
# Apply the action to the Nucon system
for param_id, value in action.items():
param = next(p for p in self.nucon if p.id == param_id)
param = next(p for p in Nucon if p.id == param_id)
if issubclass(param.param_type, Enum):
value = param.param_type(value)
if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value)
Nucon.set(param, value)
observation = self._get_obs()
terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above
@ -141,10 +126,7 @@ class NuconEnv(gym.Env):
reward = sum(obj for obj in info['objectives_weighted'].values())
self._total_steps += 1
if self.simulator:
self.simulator.update(self.seconds_per_step)
else:
time.sleep(self.seconds_per_step)
time.sleep(self.seconds_per_step)
return observation, reward, terminated, truncated, info
def render(self):
@ -166,7 +148,6 @@ class NuconEnv(gym.Env):
def _unflatten_observation(self, flat_observation):
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
def register_nucon_envs():
gym.register(
id='Nucon-max_power-v0',
@ -174,14 +155,9 @@ def register_nucon_envs():
kwargs={'seconds_per_step': 5, 'objectives': ['max_power']}
)
gym.register(
id='Nucon-target_temperature_350-v0',
id='Nucon-target_temperature_600-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]}
)
gym.register(
id='Nucon-safe_max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=600)]}
)
register_nucon_envs()

View File

@ -1,315 +0,0 @@
import random
from typing import Dict, Union, Any, Tuple, List
from enum import Enum
from flask import Flask, request, jsonify
from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
import threading
import torch
from nucon.model import ReactorDynamicsModel
class OperatingState(Enum):
# Tuple indicates a range of values, while list indicates a set of possible values
OFFLINE = {
'CORE_TEMP': (18.0, 22.0),
'CORE_TEMP_OPERATIVE': [306.0],
'CORE_TEMP_MAX': [1000.0],
'CORE_TEMP_MIN': [0.0],
'CORE_TEMP_RESIDUAL': [False],
'CORE_PRESSURE': (0.9, 1.1),
'CORE_PRESSURE_MAX': [1.5],
'CORE_PRESSURE_OPERATIVE': [1.0],
'CORE_INTEGRITY': [100.0],
'CORE_WEAR': [0.0],
'CORE_STATE': ['OFFLINE'],
'CORE_STATE_CRITICALITY': [0.0],
'CORE_CRITICAL_MASS_REACHED': [False],
'CORE_CRITICAL_MASS_REACHED_COUNTER': [0],
'CORE_IMMINENT_FUSION': [False],
'CORE_READY_FOR_START': [False],
'CORE_STEAM_PRESENT': [False],
'CORE_HIGH_STEAM_PRESENT': [False],
'TIME': ['00:00:00'],
'TIME_STAMP': ['1970-01-01 00:00:00'],
'COOLANT_CORE_STATE': ['INACTIVE'],
'COOLANT_CORE_PRESSURE': (0.9, 1.1),
'COOLANT_CORE_MAX_PRESSURE': [1.5],
'COOLANT_CORE_VESSEL_TEMPERATURE': (18.0, 22.0),
'COOLANT_CORE_QUANTITY_IN_VESSEL': [0.0],
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': [0.0],
'COOLANT_CORE_FLOW_SPEED': [0.0],
'COOLANT_CORE_FLOW_ORDERED_SPEED': [0.0],
'COOLANT_CORE_FLOW_REACHED_SPEED': [False],
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': [3],
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': [2],
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': [0.0],
'RODS_STATUS': ['INACTIVE'],
'RODS_MOVEMENT_SPEED': [0.0],
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': [False],
'RODS_DEFORMED': [False],
'RODS_TEMPERATURE': (18.0, 22.0),
'RODS_MAX_TEMPERATURE': [1000.0],
'RODS_POS_ORDERED': [0.0],
'RODS_POS_ACTUAL': [0.0],
'RODS_POS_REACHED': [True],
'RODS_QUANTITY': [100],
'RODS_ALIGNED': [True],
'GENERATOR_0_KW': [0.0],
'GENERATOR_1_KW': [0.0],
'GENERATOR_2_KW': [0.0],
'GENERATOR_0_V': [0.0],
'GENERATOR_1_V': [0.0],
'GENERATOR_2_V': [0.0],
'GENERATOR_0_A': [0.0],
'GENERATOR_1_A': [0.0],
'GENERATOR_2_A': [0.0],
'GENERATOR_0_HERTZ': [0.0],
'GENERATOR_1_HERTZ': [0.0],
'GENERATOR_2_HERTZ': [0.0],
'GENERATOR_0_BREAKER': [BreakerStatus.OPEN],
'GENERATOR_1_BREAKER': [BreakerStatus.OPEN],
'GENERATOR_2_BREAKER': [BreakerStatus.OPEN],
'STEAM_TURBINE_0_RPM': [0.0],
'STEAM_TURBINE_1_RPM': [0.0],
'STEAM_TURBINE_2_RPM': [0.0],
'STEAM_TURBINE_0_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_1_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_2_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_0_PRESSURE': (0.9, 1.1),
'STEAM_TURBINE_1_PRESSURE': (0.9, 1.1),
'STEAM_TURBINE_2_PRESSURE': (0.9, 1.1),
}
NOMINAL = {
'CORE_TEMP': (290.0, 370.0),
'CORE_TEMP_OPERATIVE': [306.0],
'CORE_TEMP_MAX': [1000.0],
'CORE_TEMP_MIN': [0.0],
'CORE_TEMP_RESIDUAL': [False],
'CORE_PRESSURE': (14.5, 15.5),
'CORE_PRESSURE_MAX': [16.0],
'CORE_PRESSURE_OPERATIVE': [15.0],
'CORE_INTEGRITY': (99.0, 100.0),
'CORE_WEAR': (0.0, 1.0),
'CORE_STATE': ['ACTIVE'],
'CORE_STATE_CRITICALITY': (0.9, 1.1),
'CORE_CRITICAL_MASS_REACHED': [True],
'CORE_CRITICAL_MASS_REACHED_COUNTER': [1],
'CORE_IMMINENT_FUSION': [False],
'CORE_READY_FOR_START': [True],
'CORE_STEAM_PRESENT': [True],
'CORE_HIGH_STEAM_PRESENT': [False],
'TIME': ['00:00:00'],
'TIME_STAMP': ['1970-01-01 00:00:00'],
'COOLANT_CORE_STATE': ['ACTIVE'],
'COOLANT_CORE_PRESSURE': (13.5, 14.5),
'COOLANT_CORE_MAX_PRESSURE': [15.0],
'COOLANT_CORE_VESSEL_TEMPERATURE': (270.0, 290.0),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (95.0, 100.0),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (95.0, 100.0),
'COOLANT_CORE_FLOW_SPEED': (9.5, 10.5),
'COOLANT_CORE_FLOW_ORDERED_SPEED': [10.0],
'COOLANT_CORE_FLOW_REACHED_SPEED': [True],
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': [3],
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': [3],
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (95.0, 100.0),
'RODS_STATUS': ['ACTIVE'],
'RODS_MOVEMENT_SPEED': (0.9, 1.1),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': [False],
'RODS_DEFORMED': [False],
'RODS_TEMPERATURE': (290.0, 310.0),
'RODS_MAX_TEMPERATURE': [1000.0],
'RODS_POS_ORDERED': [50.0],
'RODS_POS_ACTUAL': (49.5, 50.5),
'RODS_POS_REACHED': [True],
'RODS_QUANTITY': [100],
'RODS_ALIGNED': [True],
'GENERATOR_0_KW': (950.0, 1050.0),
'GENERATOR_1_KW': (950.0, 1050.0),
'GENERATOR_2_KW': (950.0, 1050.0),
'GENERATOR_0_V': (380.0, 420.0),
'GENERATOR_1_V': (380.0, 420.0),
'GENERATOR_2_V': (380.0, 420.0),
'GENERATOR_0_A': (2375.0, 2625.0),
'GENERATOR_1_A': (2375.0, 2625.0),
'GENERATOR_2_A': (2375.0, 2625.0),
'GENERATOR_0_HERTZ': (49.5, 50.5),
'GENERATOR_1_HERTZ': (49.5, 50.5),
'GENERATOR_2_HERTZ': (49.5, 50.5),
'GENERATOR_0_BREAKER': [BreakerStatus.CLOSED],
'GENERATOR_1_BREAKER': [BreakerStatus.CLOSED],
'GENERATOR_2_BREAKER': [BreakerStatus.CLOSED],
'STEAM_TURBINE_0_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_1_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_2_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_0_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_1_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_2_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_0_PRESSURE': (13.5, 14.5),
'STEAM_TURBINE_1_PRESSURE': (13.5, 14.5),
'STEAM_TURBINE_2_PRESSURE': (13.5, 14.5),
}
class NuconSimulator:
class Parameters:
def __init__(self, nucon: Nucon):
for param_name in nucon.get_all_readable():
setattr(self, param_name, None)
def __init__(self, host: str = 'localhost', port: int = 8786):
self._nucon = Nucon()
self.parameters = self.Parameters(self._nucon)
self.time = 0.0
self.allow_all_writes = False
self.set_state(OperatingState.OFFLINE)
self.model = None
self.readable_params = list(self._nucon.get_all_readable().keys())
self.non_writable_params = [name for name, param in self._nucon.get_all_readable().items() if not param.is_writable]
self._run(host, port)
def get(self, parameter: Union[str, Any]) -> Any:
if isinstance(parameter, str):
return getattr(self.parameters, parameter)
return getattr(self.parameters, parameter.id)
def set(self, parameter: Union[str, Any], value: Any, force: bool = False) -> None:
if isinstance(parameter, str):
param_obj = self._nucon[parameter]
else:
param_obj = parameter
if not param_obj.is_writable and not force and not self.allow_all_writes:
raise ValueError(f"Parameter {param_obj.id} is not writable")
# Convert value to the correct type
try:
if param_obj.enum_type:
if isinstance(value, str):
value = param_obj.enum_type[value.upper()]
elif isinstance(value, int):
value = param_obj.enum_type(value)
elif not isinstance(value, param_obj.enum_type):
raise ValueError(f"Invalid enum value for {param_obj.id}")
else:
value = param_obj.param_type(value)
except (ValueError, KeyError):
raise ValueError(f"Invalid type for parameter {param_obj.id}. Expected {param_obj.param_type}")
# Check range if not forced
if not force and param_obj.min_val is not None and param_obj.max_val is not None:
if not param_obj.min_val <= value <= param_obj.max_val:
raise ValueError(f"Value {value} is out of range for parameter {param_obj.id}. "
f"Valid range: [{param_obj.min_val}, {param_obj.max_val}]")
setattr(self.parameters, param_obj.id, value)
def set_allow_all_writes(self, allow: bool) -> None:
self.allow_all_writes = allow
def update(self, time_step: float) -> None:
self._update_reactor_state(time_step)
self.time += time_step
def load_model(self, model_path: str) -> None:
try:
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.model.load_state_dict(torch.load(model_path))
self.model.eval() # Set the model to evaluation mode
print(f"Model loaded successfully from {model_path}")
except Exception as e:
print(f"Error loading model: {str(e)}")
self.model = None
def _update_reactor_state(self, time_step: float) -> None:
if not self.model:
raise ValueError("Model not set. Please load a model using load_model() method.")
state = {}
for param in self.readable_params:
value = self.get(param)
if isinstance(value, Enum):
value = value.value
state[param] = value
# Use the model to predict the next state
with torch.no_grad():
next_state = self.model(state, time_step)
# Update the simulator's state
for param, value in next_state.items():
self.set(param, value)
def set_state(self, state: OperatingState) -> None:
self._sample_parameters_from_state(state)
def _sample_parameters_from_state(self, state: OperatingState) -> None:
for param_name, value_spec in state.value.items():
param = self._nucon.get_all_readable()[param_name]
if isinstance(value_spec, tuple):
value = random.uniform(*value_spec)
elif isinstance(value_spec, list):
value = random.choice(value_spec)
else:
raise ValueError(f"Invalid value specification for parameter {param_name}")
self.set(param, value, force=True)
def _run(self, port: int = 8786, host: str = 'localhost', debug: bool = False):
app = Flask(__name__)
@app.route('/', methods=['GET'])
def get_parameter():
variable = request.args.get('variable')
if not variable:
return jsonify({"error": "No variable specified"}), 400
try:
value = self.get(variable)
return str(value), 200
except KeyError:
return jsonify({"error": f"Unknown variable: {variable}"}), 404
@app.route('/', methods=['POST'])
def set_parameter():
variable = request.args.get('variable')
value = request.args.get('value')
if not variable or value is None:
return jsonify({"error": "Both variable and value must be specified"}), 400
try:
self.set(variable, value)
return "OK", 200
except ValueError as e:
return jsonify({"error": str(e)}), 400
except KeyError:
return jsonify({"error": f"Unknown variable: {variable}"}), 404
def run_simulator(host='localhost', port=8786, debug=False):
app.run(host=host, port=port, debug=debug)
threading.Thread(target=run_simulator, args=(host, port, debug), daemon=True).start()

View File

@ -1,50 +1,41 @@
import pytest
import warnings
from nucon import Nucon, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
from nucon import Nucon, NuconConfig, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
WARN_FLOAT_COULD_BE_INT = False
@pytest.fixture(scope="module")
def nucon_setup():
Nucon.set_dummy_mode(False)
Nucon.set_dummy_mode(False) # Assume the game is running
Nucon.set_base_url("http://localhost:8785/")
yield
Nucon.set_dummy_mode(True)
def test_read_all_parameters(nucon_setup):
all_params = Nucon.get_all()
assert len(all_params) == len(Nucon)
for param, value in all_params.items():
assert isinstance(value, param.param_type), f"Parameter {param.id} has incorrect type. Expected {param.param_type}, got {type(value)}"
assert isinstance(value, param.param_type), f"Parameter {param.name} has incorrect type. Expected {param.param_type}, got {type(value)}"
if param.param_type == float and value.is_integer() and WARN_FLOAT_COULD_BE_INT:
warnings.warn(f"Parameter {param.id} is a float but has an integer value: {value}")
if param.param_type == str:
try:
float(value)
raise ValueError(f"Parameter {param.id} is a string that looks like a number: {value}")
except ValueError:
pass
try:
bool(value.lower())
raise ValueError(f"Parameter {param.id} is a string that looks like a boolean: {value}")
except ValueError:
pass
warnings.warn(f"Parameter {param.name} is a float but has an integer value: {value}")
def test_write_writable_parameters(nucon_setup):
writable_params = Nucon.get_all_writable()
for param in writable_params:
current_value = param.value
param.value = current_value
assert param.value == current_value, f"Failed to write to parameter {param.id}"
assert param.value == current_value, f"Failed to write to parameter {param.name}"
def test_non_writable_parameters(nucon_setup):
non_writable_params = [param for param in Nucon if not param.is_writable]
for param in non_writable_params:
# Test that normal set raises an error
with pytest.raises(ValueError, match=f"Parameter {param.id} is not writable"):
with pytest.raises(ValueError, match=f"Parameter {param.name} is not writable"):
param.value = param.value # Attempt to write the current value
# Test that force_set is refused by the webserver
current_value = param.value
with pytest.raises(Exception, match=f"Failed to set parameter {param.id}"):
with pytest.raises(Exception, match=f"Failed to set parameter {param.name}"):
Nucon.set(param, current_value, force=True)
def test_enum_parameters(nucon_setup):

View File

@ -1,114 +0,0 @@
import pytest
import time
from nucon import Nucon, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
from nucon.sim import NuconSimulator, OperatingState
@pytest.fixture(scope="module")
def simulator_setup():
simulator = NuconSimulator(port=8786)
simulator.run()
time.sleep(1) # Give the simulator time to start
nucon = Nucon(port=8786)
return simulator, nucon
def test_simulator_initialization(simulator_setup):
simulator, nucon = simulator_setup
assert simulator is not None
assert nucon is not None
def test_read_all_parameters(simulator_setup):
_, nucon = simulator_setup
all_params = nucon.get_all()
assert len(all_params) == len(nucon)
for param_id, value in all_params.items():
param = nucon[param_id]
assert isinstance(value, param.param_type), f"Parameter {param.id} has incorrect type. Expected {param.param_type}, got {type(value)}"
def test_write_writable_parameters(simulator_setup):
_, nucon = simulator_setup
writable_params = nucon.get_all_writable()
for param in writable_params.values():
current_value = param.value
if param.param_type == float:
new_value = current_value + 1.0
elif param.param_type == int:
new_value = current_value + 1
elif param.param_type == bool:
new_value = not current_value
elif issubclass(param.param_type, Enum):
new_value = list(param.param_type)[0]
else:
continue # Skip if we can't determine a new value
param.value = new_value
assert param.value == new_value, f"Failed to write to parameter {param.id}"
def test_non_writable_parameters(simulator_setup):
_, nucon = simulator_setup
non_writable_params = [param for param in nucon.get_all_readable().values() if not param.is_writable]
for param in non_writable_params:
with pytest.raises(ValueError, match=f"Parameter {param.id} is not writable"):
param.value = param.value # Attempt to write the current value
def test_enum_parameters(simulator_setup):
_, nucon = simulator_setup
pump_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
assert isinstance(pump_status, PumpStatus)
dry_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value
assert isinstance(dry_status, PumpDryStatus)
overload_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value
assert isinstance(overload_status, PumpOverloadStatus)
breaker_status = nucon.GENERATOR_0_BREAKER.value
assert isinstance(breaker_status, BreakerStatus)
def test_custom_truthy_values(simulator_setup):
_, nucon = simulator_setup
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value in [PumpStatus.ACTIVE_NO_SPEED_REACHED, PumpStatus.ACTIVE_SPEED_REACHED])
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value == PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID)
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value == PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD)
assert bool(nucon.GENERATOR_0_BREAKER.value) == (nucon.GENERATOR_0_BREAKER.value == BreakerStatus.OPEN)
def test_get_multiple_parameters(simulator_setup):
_, nucon = simulator_setup
params_to_get = [nucon.CORE_TEMP, nucon.CORE_PRESSURE, nucon.RODS_POS_ACTUAL]
multiple_params = nucon.get_multiple(params_to_get)
assert len(multiple_params) == len(params_to_get)
for param_id, value in multiple_params.items():
assert isinstance(value, nucon[param_id].param_type)
def test_simulator_update(simulator_setup):
simulator, nucon = simulator_setup
initial_temp = nucon.CORE_TEMP.value
simulator.update(10) # Update for 10 seconds
new_temp = nucon.CORE_TEMP.value
assert new_temp != initial_temp, "Core temperature should change after update"
def test_set_operating_state(simulator_setup):
simulator, nucon = simulator_setup
simulator._set_state(OperatingState.NOMINAL)
time.sleep(1) # Give the simulator time to update
assert nucon.CORE_STATE.value == 'NOMINAL'
assert 290 <= nucon.CORE_TEMP.value <= 370
def test_allow_all_writes(simulator_setup):
simulator, nucon = simulator_setup
simulator.set_allow_all_writes(True)
non_writable_param = next(param for param in nucon.get_all_readable().values() if not param.is_writable)
current_value = non_writable_param.value
new_value = current_value + 1 if isinstance(current_value, (int, float)) else not current_value
non_writable_param.value = new_value
assert non_writable_param.value == new_value, f"Failed to write to non-writable parameter {non_writable_param.id} when allow_all_writes is True"
def test_simulator_consistency(simulator_setup):
simulator, nucon = simulator_setup
for _ in range(10):
simulator.update(1)
all_params = nucon.get_all()
for param_id, value in all_params.items():
assert value == getattr(simulator.parameters, param_id), f"Inconsistency found for parameter {param_id}"
if __name__ == "__main__":
pytest.main()