Compare commits

..

No commits in common. "e665a457dc9ba8c3642f9c9788441f65c028f194" and "b0a2ac75749624bc3246d5fd4205089973c09cc4" have entirely different histories.

7 changed files with 280 additions and 885 deletions

View File

@ -1 +1 @@
from nucon.core import * from nucon.core import *

View File

@ -1,20 +1,28 @@
from enum import Enum from enum import Enum, IntEnum
from typing import Union, Dict, Type, List, Optional, Any, Iterator, Tuple
import requests import requests
import random from typing import Union, Dict, Type, List, Optional, Any
class NuconConfig:
base_url = "http://localhost:8785/"
dummy_mode = False
class ParameterEnum(Enum): class ParameterEnum(Enum):
@classmethod @classmethod
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]: def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
if isinstance(value, str): if isinstance(value, str):
if value.lower() == 'true': # Handle boolean-like strings
if value.lower() in ('true'):
return cls(True) return cls(True)
elif value.lower() == 'false': elif value.lower() in ('false'):
return cls(False) return cls(False)
# Try to convert to int for int-based enums
try: try:
return cls(int(value)) return cls(int(value))
except ValueError: except ValueError:
pass pass
# If we can't handle the value, let the default Enum behavior take over
return None return None
class PumpStatus(ParameterEnum): class PumpStatus(ParameterEnum):
@ -53,208 +61,214 @@ CoreState = str
CoolantCoreState = str CoolantCoreState = str
RodsState = str RodsState = str
class NuconParameter: #class CoreState(ParameterEnum):
def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None): # REACTIVE = 'REACTIVO'
self.nucon = nucon #
# def __bool__(self):
# return self.value == 'REACTIVO'
#class CoolantCoreState(ParameterEnum):
# CIRCULATING = 'CIRCULANDO'
#
# def __bool__(self):
# return self.value == 'CIRCULANDO'
class Nucon(Enum):
# Core
CORE_TEMP = ("CORE_TEMP", float, False)
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False)
CORE_PRESSURE = ("CORE_PRESSURE", float, False)
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
CORE_WEAR = ("CORE_WEAR", float, False)
CORE_STATE = ("CORE_STATE", CoreState, False)
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
CORE_IMMINENT_FUSION = ("CORE_IMMINENT_FUSION", bool, False)
CORE_READY_FOR_START = ("CORE_READY_FOR_START", bool, False)
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
# Time
TIME = ("TIME", str, False)
TIME_STAMP = ("TIME_STAMP", str, False)
# Coolant Core
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False)
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False)
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False)
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
# Circulation Pumps
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
# Rods
RODS_STATUS = ("RODS_STATUS", RodsState, False)
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
RODS_TEMPERATURE = ("RODS_TEMPERATURE", float, False)
RODS_MAX_TEMPERATURE = ("RODS_MAX_TEMPERATURE", float, False)
RODS_POS_ORDERED = ("RODS_POS_ORDERED", float, True)
RODS_POS_ACTUAL = ("RODS_POS_ACTUAL", float, False)
RODS_POS_REACHED = ("RODS_POS_REACHED", bool, False)
RODS_QUANTITY = ("RODS_QUANTITY", int, False)
RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
# Generators
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
GENERATOR_0_V = ("GENERATOR_0_V", float, False)
GENERATOR_1_V = ("GENERATOR_1_V", float, False)
GENERATOR_2_V = ("GENERATOR_2_V", float, False)
GENERATOR_0_A = ("GENERATOR_0_A", float, False)
GENERATOR_1_A = ("GENERATOR_1_A", float, False)
GENERATOR_2_A = ("GENERATOR_2_A", float, False)
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False)
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False)
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False)
# Steam Turbines
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
STEAM_TURBINE_0_TEMPERATURE = ("STEAM_TURBINE_0_TEMPERATURE", float, False)
STEAM_TURBINE_1_TEMPERATURE = ("STEAM_TURBINE_1_TEMPERATURE", float, False)
STEAM_TURBINE_2_TEMPERATURE = ("STEAM_TURBINE_2_TEMPERATURE", float, False)
STEAM_TURBINE_0_PRESSURE = ("STEAM_TURBINE_0_PRESSURE", float, False)
STEAM_TURBINE_1_PRESSURE = ("STEAM_TURBINE_1_PRESSURE", float, False)
STEAM_TURBINE_2_PRESSURE = ("STEAM_TURBINE_2_PRESSURE", float, False)
def __init__(self, id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
self.id = id self.id = id
self.param_type = param_type self.param_type = param_type
self.is_writable = is_writable self.is_writable = is_writable
self.min_val = min_val self.min_val = min_val
self.max_val = max_val self.max_val = max_val
@property def __repr__(self) -> str:
def enum_type(self) -> Type[Enum]: type_repr = repr(self.param_type)
return self.param_type if issubclass(self.param_type, Enum) else None return (f"<{self.__class__.__name__}.{self.name}: "
f"{{'value': {repr(self.value)}, 'type': {type_repr}, "
def check_in_range(self, value: Union[int, float, Enum], raise_on_oob: bool = False) -> bool: f"'writable': {self.is_writable}}}>"
if self.enum_type: )
if not isinstance(value, self.enum_type):
if raise_on_oob:
raise ValueError(f"Value {value} is not a valid {self.enum_type.__name__}")
return False
return True
if self.min_val is not None and value < self.min_val:
if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val}")
return False
if self.max_val is not None and value > self.max_val:
if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val}")
return False
return True
@property
def value(self):
return self.nucon.get(self)
@value.setter
def value(self, new_value):
self.nucon.set(self, new_value)
def read(self):
return self.value
def write(self, new_value, force=False):
self.nucon.set(self, new_value, force)
def __repr__(self):
return f"NuconParameter(id='{self.id}', value={self.value}, type={self.param_type.__name__}, writable={self.is_writable})"
def __str__(self): def __str__(self):
return self.id return self.id
class Nucon: @property
def __init__(self, host: str = 'localhost', port: int = 8786): def enum_type(self) -> Type[Enum]:
self.base_url = f'http://{host}:{port}/' return self.param_type if issubclass(self.param_type, Enum) else None
self.dummy_mode = False
self._parameters = self._create_parameters()
def _create_parameters(self) -> Dict[str, NuconParameter]: @property
param_values = { def value(self) -> Union[float, int, bool, str, Enum]:
'CORE_TEMP': (float, False, 0, 1000), return self.read()
'CORE_TEMP_OPERATIVE': (float, False),
'CORE_TEMP_MAX': (float, False),
'CORE_TEMP_MIN': (float, False),
'CORE_TEMP_RESIDUAL': (bool, False),
'CORE_PRESSURE': (float, False),
'CORE_PRESSURE_MAX': (float, False),
'CORE_PRESSURE_OPERATIVE': (float, False),
'CORE_INTEGRITY': (float, False),
'CORE_WEAR': (float, False),
'CORE_STATE': (CoreState, False),
'CORE_STATE_CRITICALITY': (float, False),
'CORE_CRITICAL_MASS_REACHED': (bool, False),
'CORE_CRITICAL_MASS_REACHED_COUNTER': (int, False),
'CORE_IMMINENT_FUSION': (bool, False),
'CORE_READY_FOR_START': (bool, False),
'CORE_STEAM_PRESENT': (bool, False),
'CORE_HIGH_STEAM_PRESENT': (bool, False),
'TIME': (str, False),
'TIME_STAMP': (str, False),
'COOLANT_CORE_STATE': (CoolantCoreState, False),
'COOLANT_CORE_PRESSURE': (float, False),
'COOLANT_CORE_MAX_PRESSURE': (float, False),
'COOLANT_CORE_VESSEL_TEMPERATURE': (float, False),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (float, False),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (float, False),
'COOLANT_CORE_FLOW_SPEED': (float, False),
'COOLANT_CORE_FLOW_ORDERED_SPEED': (float, False),
'COOLANT_CORE_FLOW_REACHED_SPEED': (bool, False),
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (float, False),
'RODS_STATUS': (RodsState, False),
'RODS_MOVEMENT_SPEED': (float, False),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': (bool, False),
'RODS_DEFORMED': (bool, False),
'RODS_TEMPERATURE': (float, False),
'RODS_MAX_TEMPERATURE': (float, False),
'RODS_POS_ORDERED': (float, True),
'RODS_POS_ACTUAL': (float, False),
'RODS_POS_REACHED': (bool, False),
'RODS_QUANTITY': (int, False),
'RODS_ALIGNED': (bool, False),
'GENERATOR_0_KW': (float, False),
'GENERATOR_1_KW': (float, False),
'GENERATOR_2_KW': (float, False),
'GENERATOR_0_V': (float, False),
'GENERATOR_1_V': (float, False),
'GENERATOR_2_V': (float, False),
'GENERATOR_0_A': (float, False),
'GENERATOR_1_A': (float, False),
'GENERATOR_2_A': (float, False),
'GENERATOR_0_HERTZ': (float, False),
'GENERATOR_1_HERTZ': (float, False),
'GENERATOR_2_HERTZ': (float, False),
'GENERATOR_0_BREAKER': (BreakerStatus, False),
'GENERATOR_1_BREAKER': (BreakerStatus, False),
'GENERATOR_2_BREAKER': (BreakerStatus, False),
'STEAM_TURBINE_0_RPM': (float, False),
'STEAM_TURBINE_1_RPM': (float, False),
'STEAM_TURBINE_2_RPM': (float, False),
'STEAM_TURBINE_0_TEMPERATURE': (float, False),
'STEAM_TURBINE_1_TEMPERATURE': (float, False),
'STEAM_TURBINE_2_TEMPERATURE': (float, False),
'STEAM_TURBINE_0_PRESSURE': (float, False),
'STEAM_TURBINE_1_PRESSURE': (float, False),
'STEAM_TURBINE_2_PRESSURE': (float, False),
}
return { def read(self) -> Union[float, int, bool, str, Enum]:
name: NuconParameter(self, name, *values) return Nucon.get(self)
for name, values in param_values.items()
}
def get(self, parameter: Union[str, NuconParameter]) -> Union[float, int, bool, str, Enum]: @value.setter
def value(self, new_value: Union[float, int, bool, str, Enum]) -> None:
self.write(new_value)
def write(self, new_value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
Nucon.set(self, new_value, force)
def check_in_range(self, value: Union[int, float], raise_on_oob=False) -> None:
if self.min_val is not None and value < self.min_val:
if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val} for {self.name}")
return False
if self.max_val is not None and value > self.max_val:
if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val} for {self.name}")
return False
return True
@classmethod
def set_base_url(cls, url: str) -> None:
NuconConfig.base_url = url
@classmethod
def set_dummy_mode(cls, dummy_mode: bool) -> None:
NuconConfig.dummy_mode = dummy_mode
@classmethod
def get_all_readable(cls) -> List['Nucon']:
return list(cls) # All parameters are readable
@classmethod
def get_all_writable(cls) -> List['Nucon']:
return [param for param in cls if param.is_writable]
@classmethod
def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]:
if isinstance(parameter, str): if isinstance(parameter, str):
parameter = self._parameters[parameter] parameter = cls[parameter]
if self.dummy_mode:
return self._get_dummy_value(parameter)
value = self._query(parameter) if NuconConfig.dummy_mode:
return cls._get_dummy_value(parameter)
if parameter.enum_type:
value = cls._query(parameter.name)
if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum):
return parameter.enum_type(value) return parameter.enum_type(value)
elif parameter.param_type == float:
return float(value)
elif parameter.param_type == int:
return int(value)
elif parameter.param_type == bool: elif parameter.param_type == bool:
if isinstance(value, str): if value.lower() in ('true'):
if value.lower() not in ('true', 'false'): return True
raise ValueError(f"Invalid boolean value: {value}") elif value.lower() in ('false'):
return value.lower() == 'true' return False
else: else:
raise ValueError(f"Expected string for boolean parameter, got {type(value)}") raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).")
else: else:
return parameter.param_type(value) return value
def set(self, parameter: Union[str, NuconParameter], value: Union[float, int, bool, str, Enum], force: bool = False) -> None: @classmethod
if isinstance(parameter, str): def _query(cls, parameter_name: str) -> str:
parameter = self._parameters[parameter] response = requests.get(NuconConfig.base_url, params={"variable": parameter_name})
if not force and not parameter.is_writable:
raise ValueError(f"Parameter {parameter} is not writable")
if not force:
parameter.check_in_range(value, raise_on_oob=True)
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
if self.dummy_mode:
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter} to {value}")
return
self._set_value(parameter, str(value))
def _query(self, parameter: NuconParameter) -> str:
response = requests.get(self.base_url, params={"variable": parameter.id})
if response.status_code != 200: if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter.id}. Status code: {response.status_code}") raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}")
return response.text.strip() return response.text.strip()
def _set_value(self, parameter: NuconParameter, value: str) -> None: @classmethod
response = requests.post(self.base_url, params={"variable": parameter.id, "value": value}) def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
if response.status_code != 200:
raise Exception(f"Failed to set parameter {parameter.id}. Status code: {response.status_code}")
def _get_dummy_value(self, parameter: NuconParameter) -> Union[float, int, bool, str, Enum]:
if parameter.enum_type: if parameter.enum_type:
return next(iter(parameter.enum_type)) return next(iter(parameter.enum_type))
elif parameter.param_type == float: elif parameter.param_type == float:
@ -264,52 +278,69 @@ class Nucon:
return 3.14 return 3.14
elif parameter.param_type == int: elif parameter.param_type == int:
if parameter.max_val is not None and parameter.min_val is not None: if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) // 2 + parameter.min_val return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else: else:
return 42 return 42
elif parameter.param_type == bool: elif parameter.param_type == bool:
return random.choice([True, False]) return False
else: else:
return "dummy" return "dummy"
def get_multiple_iter(self, parameters: List[Union[str, NuconParameter]]) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]: @classmethod
for param in parameters: def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
if isinstance(param, str): return {param: cls.get(param) for param in parameters}
param_name = param
param_obj = self._parameters[param]
else:
param_name = next(name for name, p in self._parameters.items() if p is param)
param_obj = param
yield param_name, self.get(param_obj)
def get_multiple(self, parameters: List[Union[str, NuconParameter]]) -> Dict[str, Union[float, int, bool, str, Enum]]: @classmethod
return dict(self.get_multiple_iter(parameters)) def get_all(cls) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return cls.get_multiple(list(cls))
def get_all_iter(self) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]: @classmethod
return self.get_multiple_iter(self._parameters.keys()) def set(cls, parameter: Union['Nucon', str], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
if isinstance(parameter, str):
parameter = next((param for param in cls if param.id == parameter), None)
if parameter is None:
raise ValueError(f"No parameter found with id '{parameter}'")
def get_all(self) -> Dict[str, Union[float, int, bool, str, Enum]]: if not force and not parameter.is_writable:
return dict(self.get_all_iter()) raise ValueError(f"Parameter {parameter.name} is not writable")
def get_all_readable(self) -> List[NuconParameter]: if not force:
return {name: param for name, param in self._parameters.items()} parameter.check_in_range(value, raise_on_oob=True)
def get_all_writable(self) -> List[NuconParameter]: if parameter.enum_type and isinstance(value, parameter.enum_type):
return {name: param for name, param in self._parameters.items() if param.is_writable} value = value.value
def set_dummy_mode(self, dummy_mode: bool) -> None: if NuconConfig.dummy_mode:
self.dummy_mode = dummy_mode print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter.name} to {value}")
return
def __getattr__(self, name): response = requests.post(NuconConfig.base_url, params={"variable": parameter.name, "value": str(value)})
if name in self._parameters:
return self._parameters[name] if response.status_code != 200:
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") raise Exception(f"Failed to set parameter {parameter.name}. Status code: {response.status_code}")
def __getitem__(self, key): # Example usage
return self.__getattr__(key) if __name__ == "__main__":
# Enable dummy mode for testing
Nucon.set_dummy_mode(True)
def __dir__(self): # Get a single parameter
return list(super().__dir__()) + list(self._parameters.keys()) core_temp = Nucon.CORE_TEMP.value
print(f"Core Temperature: {core_temp}")
def __len__(self): # Get a parameter with an enum
return len(self._parameters) pump_status = Nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
print(f"Pump 0 Status: {pump_status}")
# Set a parameter with an enum
try:
Nucon.GENERATOR_0_BREAKER.value = BreakerStatus.OPEN
print(f"Successfully set GENERATOR_0_BREAKER to {Nucon.GENERATOR_0_BREAKER.value}")
except ValueError as e:
print(f"Error: {e}")
# Get all parameters
all_params = Nucon.get_all()
print("All parameters:")
for param, value in all_params.items():
print(f"{param.name}: {value}")

View File

@ -1,174 +0,0 @@
import numpy as np
import time
import torch
import torch.nn as nn
import torch.optim as optim
import random
from enum import Enum
from nucon import Nucon
import pickle
import os
from typing import Union, Tuple, List
Actors = {
'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()},
'null': lambda nucon: lambda obs: {},
}
class ReactorDynamicsNet(nn.Module):
def __init__(self, input_dim, output_dim):
super(ReactorDynamicsNet, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim + 1, 128), # +1 for time_delta
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, state, time_delta):
x = torch.cat([state, time_delta], dim=-1)
return self.network(x)
class ReactorDynamicsModel(nn.Module):
def __init__(self, input_params: List[str], output_params: List[str]):
super(ReactorDynamicsModel, self).__init__()
self.input_params = input_params
self.output_params = output_params
input_dim = len(input_params)
output_dim = len(output_params)
self.net = ReactorDynamicsNet(input_dim, output_dim)
def _state_dict_to_tensor(self, state_dict):
return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32)
def _tensor_to_state_dict(self, tensor):
return {p: tensor[i].item() for i, p in enumerate(self.output_params)}
def forward(self, state_dict, time_delta):
state_tensor = self._state_dict_to_tensor(state_dict).unsqueeze(0)
time_delta_tensor = torch.tensor([time_delta], dtype=torch.float32).unsqueeze(0)
predicted_tensor = self.net(state_tensor, time_delta_tensor)
return self._tensor_to_state_dict(predicted_tensor.squeeze(0))
class NuconModelLearner:
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl', time_delta: Union[float, Tuple[float, float]] = 0.1):
self.nucon = Nucon() if nucon is None else nucon
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
self.dataset = self.load_dataset(dataset_path) or []
self.dataset_path = dataset_path
self.readable_params = list(self.nucon.get_all_readable().keys())
self.non_writable_params = [param.id for param in self.nucon.get_all_readable().values() if not param.is_writable]
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.optimizer = optim.Adam(self.model.parameters())
if isinstance(time_delta, (int, float)):
self.time_delta = lambda: time_delta
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
self.time_delta = lambda: random.uniform(*time_delta)
else:
raise ValueError("time_delta must be a float or a tuple of two floats")
def _get_state(self):
state = {}
for param_id, param in self.nucon.get_all_readable().items():
value = self.nucon.get(param)
if isinstance(value, Enum):
value = value.value
state[param_id] = value
return state
def collect_data(self, num_steps):
state = self._get_state()
for _ in range(num_steps):
action = self.actor(state)
start_time = time.time()
for param_id, value in action.items():
self.nucon.set(param_id, value)
time_delta = self.time_delta()
time.sleep(time_delta)
next_state = self._get_state()
self.dataset.append((state, action, next_state, time_delta))
state = next_state
self.save_dataset()
def refine_dataset(self, error_threshold):
refined_data = []
for state, action, next_state, time_delta in self.dataset:
predicted_next_state = self.model(state, time_delta)
error = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
if error > error_threshold:
refined_data.append((state, action, next_state, time_delta))
self.dataset = refined_data
self.save_dataset()
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
random.shuffle(self.dataset)
split_idx = int(len(self.dataset) * (1 - test_split))
train_data = self.dataset[:split_idx]
test_data = self.dataset[split_idx:]
for epoch in range(num_epochs):
train_loss = self._train_epoch(train_data, batch_size)
test_loss = self._test_epoch(test_data)
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
def _train_epoch(self, data, batch_size):
total_loss = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
states, _, next_states, time_deltas = zip(*batch)
loss = 0
for state, next_state, time_delta in zip(states, next_states, time_deltas):
predicted_next_state = self.model(state, time_delta)
loss += sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
loss /= len(batch)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / (len(data) // batch_size)
def _test_epoch(self, data):
total_loss = 0
with torch.no_grad():
for state, _, next_state, time_delta in data:
predicted_next_state = self.model(state, time_delta)
loss = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
total_loss += loss
return total_loss / len(data)
def save_model(self, path):
torch.save(self.model.state_dict(), path)
def load_model(self, path):
self.model.load_state_dict(torch.load(path))
def save_dataset(self, path=None):
path = path or self.dataset_path
with open(path, 'wb') as f:
pickle.dump(self.dataset, f)
def load_dataset(self, path=None):
path = path or self.dataset_path
if os.path.exists(path):
with open(path, 'rb') as f:
return pickle.load(f)
return None
def merge_datasets(self, other_dataset_path):
other_dataset = self.load_dataset(other_dataset_path)
if other_dataset:
self.dataset.extend(other_dataset)
self.save_dataset()

View File

@ -3,7 +3,7 @@ from gymnasium import spaces
import numpy as np import numpy as np
import time import time
from typing import Dict, Any from typing import Dict, Any
from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus from .core import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
Objectives = { Objectives = {
"null": lambda obs: 0, "null": lambda obs: 0,
@ -13,16 +13,12 @@ Objectives = {
Parameterized_Objectives = { Parameterized_Objectives = {
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2), "target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
} }
class NuconEnv(gym.Env): class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']} metadata = {'render_modes': ['human']}
def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0): def __init__(self, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__() super().__init__()
self.render_mode = render_mode self.render_mode = render_mode
@ -30,30 +26,22 @@ class NuconEnv(gym.Env):
if objective_weights is None: if objective_weights is None:
objective_weights = [1.0 for objective in objectives] objective_weights = [1.0 for objective in objectives]
self.objective_weights = objective_weights self.objective_weights = objective_weights
self.terminate_above = terminate_above self.terminate_at = terminate_at
self.simulator = simulator
if nucon is None:
if simulator:
nucon = Nucon(port=simulator.port)
else:
nucon = Nucon()
self.nucon = nucon
# Define observation space # Define observation space
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param_id, param in self.nucon.get_all_readable().items(): for param in Nucon.get_all_readable():
if param.param_type == float: if param.param_type == float:
obs_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) obs_spaces[param.id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int: elif param.param_type == int:
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
obs_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) obs_spaces[param.id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else: else:
obs_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32) obs_spaces[param.id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool: elif param.param_type == bool:
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) obs_spaces[param.id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum): elif issubclass(param.param_type, Enum):
obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32) obs_spaces[param.id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else: else:
raise ValueError(f"Unsupported observation parameter type: {param.param_type}") raise ValueError(f"Unsupported observation parameter type: {param.param_type}")
@ -61,26 +49,23 @@ class NuconEnv(gym.Env):
# Define action space # Define action space
action_spaces = {} action_spaces = {}
for param_id, param in self.nucon.get_all_writable().items(): for param in Nucon.get_all_writable():
if param.param_type == float: if param.param_type == float:
action_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) action_spaces[param.id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int: elif param.param_type == int:
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
action_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) action_spaces[param.id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else: else:
action_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32) action_spaces[param.id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool: elif param.param_type == bool:
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) action_spaces[param.id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum): elif issubclass(param.param_type, Enum):
action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32) action_spaces[param.id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else: else:
raise ValueError(f"Unsupported action parameter type: {param.param_type}") raise ValueError(f"Unsupported action parameter type: {param.param_type}")
self.action_space = spaces.Dict(action_spaces) self.action_space = spaces.Dict(action_spaces)
self.objectives = []
self.terminators = []
for objective in objectives: for objective in objectives:
if objective in Objectives: if objective in Objectives:
self.objectives.append(Objectives[objective]) self.objectives.append(Objectives[objective])
@ -99,11 +84,11 @@ class NuconEnv(gym.Env):
def _get_obs(self): def _get_obs(self):
obs = {} obs = {}
for param_id, param in self.nucon.get_all_readable().items(): for param in Nucon.get_all_readable():
value = self.nucon.get(param_id) value = Nucon.get(param)
if isinstance(value, Enum): if isinstance(value, Enum):
value = value.value value = value.value
obs[param_id] = value obs[param.id] = value
obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step
return obs return obs
@ -127,12 +112,12 @@ class NuconEnv(gym.Env):
def step(self, action): def step(self, action):
# Apply the action to the Nucon system # Apply the action to the Nucon system
for param_id, value in action.items(): for param_id, value in action.items():
param = next(p for p in self.nucon if p.id == param_id) param = next(p for p in Nucon if p.id == param_id)
if issubclass(param.param_type, Enum): if issubclass(param.param_type, Enum):
value = param.param_type(value) value = param.param_type(value)
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val) value = np.clip(value, param.min_val, param.max_val)
self.nucon.set(param, value) Nucon.set(param, value)
observation = self._get_obs() observation = self._get_obs()
terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above
@ -141,10 +126,7 @@ class NuconEnv(gym.Env):
reward = sum(obj for obj in info['objectives_weighted'].values()) reward = sum(obj for obj in info['objectives_weighted'].values())
self._total_steps += 1 self._total_steps += 1
if self.simulator: time.sleep(self.seconds_per_step)
self.simulator.update(self.seconds_per_step)
else:
time.sleep(self.seconds_per_step)
return observation, reward, terminated, truncated, info return observation, reward, terminated, truncated, info
def render(self): def render(self):
@ -166,7 +148,6 @@ class NuconEnv(gym.Env):
def _unflatten_observation(self, flat_observation): def _unflatten_observation(self, flat_observation):
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()} return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
def register_nucon_envs(): def register_nucon_envs():
gym.register( gym.register(
id='Nucon-max_power-v0', id='Nucon-max_power-v0',
@ -174,14 +155,9 @@ def register_nucon_envs():
kwargs={'seconds_per_step': 5, 'objectives': ['max_power']} kwargs={'seconds_per_step': 5, 'objectives': ['max_power']}
) )
gym.register( gym.register(
id='Nucon-target_temperature_350-v0', id='Nucon-target_temperature_600-v0',
entry_point='nucon.rl:NuconEnv', entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]} kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=600)]}
)
gym.register(
id='Nucon-safe_max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
) )
register_nucon_envs() register_nucon_envs()

View File

@ -1,315 +0,0 @@
import random
from typing import Dict, Union, Any, Tuple, List
from enum import Enum
from flask import Flask, request, jsonify
from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
import threading
import torch
from nucon.model import ReactorDynamicsModel
class OperatingState(Enum):
# Tuple indicates a range of values, while list indicates a set of possible values
OFFLINE = {
'CORE_TEMP': (18.0, 22.0),
'CORE_TEMP_OPERATIVE': [306.0],
'CORE_TEMP_MAX': [1000.0],
'CORE_TEMP_MIN': [0.0],
'CORE_TEMP_RESIDUAL': [False],
'CORE_PRESSURE': (0.9, 1.1),
'CORE_PRESSURE_MAX': [1.5],
'CORE_PRESSURE_OPERATIVE': [1.0],
'CORE_INTEGRITY': [100.0],
'CORE_WEAR': [0.0],
'CORE_STATE': ['OFFLINE'],
'CORE_STATE_CRITICALITY': [0.0],
'CORE_CRITICAL_MASS_REACHED': [False],
'CORE_CRITICAL_MASS_REACHED_COUNTER': [0],
'CORE_IMMINENT_FUSION': [False],
'CORE_READY_FOR_START': [False],
'CORE_STEAM_PRESENT': [False],
'CORE_HIGH_STEAM_PRESENT': [False],
'TIME': ['00:00:00'],
'TIME_STAMP': ['1970-01-01 00:00:00'],
'COOLANT_CORE_STATE': ['INACTIVE'],
'COOLANT_CORE_PRESSURE': (0.9, 1.1),
'COOLANT_CORE_MAX_PRESSURE': [1.5],
'COOLANT_CORE_VESSEL_TEMPERATURE': (18.0, 22.0),
'COOLANT_CORE_QUANTITY_IN_VESSEL': [0.0],
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': [0.0],
'COOLANT_CORE_FLOW_SPEED': [0.0],
'COOLANT_CORE_FLOW_ORDERED_SPEED': [0.0],
'COOLANT_CORE_FLOW_REACHED_SPEED': [False],
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': [3],
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': [2],
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': [0.0],
'RODS_STATUS': ['INACTIVE'],
'RODS_MOVEMENT_SPEED': [0.0],
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': [False],
'RODS_DEFORMED': [False],
'RODS_TEMPERATURE': (18.0, 22.0),
'RODS_MAX_TEMPERATURE': [1000.0],
'RODS_POS_ORDERED': [0.0],
'RODS_POS_ACTUAL': [0.0],
'RODS_POS_REACHED': [True],
'RODS_QUANTITY': [100],
'RODS_ALIGNED': [True],
'GENERATOR_0_KW': [0.0],
'GENERATOR_1_KW': [0.0],
'GENERATOR_2_KW': [0.0],
'GENERATOR_0_V': [0.0],
'GENERATOR_1_V': [0.0],
'GENERATOR_2_V': [0.0],
'GENERATOR_0_A': [0.0],
'GENERATOR_1_A': [0.0],
'GENERATOR_2_A': [0.0],
'GENERATOR_0_HERTZ': [0.0],
'GENERATOR_1_HERTZ': [0.0],
'GENERATOR_2_HERTZ': [0.0],
'GENERATOR_0_BREAKER': [BreakerStatus.OPEN],
'GENERATOR_1_BREAKER': [BreakerStatus.OPEN],
'GENERATOR_2_BREAKER': [BreakerStatus.OPEN],
'STEAM_TURBINE_0_RPM': [0.0],
'STEAM_TURBINE_1_RPM': [0.0],
'STEAM_TURBINE_2_RPM': [0.0],
'STEAM_TURBINE_0_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_1_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_2_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_0_PRESSURE': (0.9, 1.1),
'STEAM_TURBINE_1_PRESSURE': (0.9, 1.1),
'STEAM_TURBINE_2_PRESSURE': (0.9, 1.1),
}
NOMINAL = {
'CORE_TEMP': (290.0, 370.0),
'CORE_TEMP_OPERATIVE': [306.0],
'CORE_TEMP_MAX': [1000.0],
'CORE_TEMP_MIN': [0.0],
'CORE_TEMP_RESIDUAL': [False],
'CORE_PRESSURE': (14.5, 15.5),
'CORE_PRESSURE_MAX': [16.0],
'CORE_PRESSURE_OPERATIVE': [15.0],
'CORE_INTEGRITY': (99.0, 100.0),
'CORE_WEAR': (0.0, 1.0),
'CORE_STATE': ['ACTIVE'],
'CORE_STATE_CRITICALITY': (0.9, 1.1),
'CORE_CRITICAL_MASS_REACHED': [True],
'CORE_CRITICAL_MASS_REACHED_COUNTER': [1],
'CORE_IMMINENT_FUSION': [False],
'CORE_READY_FOR_START': [True],
'CORE_STEAM_PRESENT': [True],
'CORE_HIGH_STEAM_PRESENT': [False],
'TIME': ['00:00:00'],
'TIME_STAMP': ['1970-01-01 00:00:00'],
'COOLANT_CORE_STATE': ['ACTIVE'],
'COOLANT_CORE_PRESSURE': (13.5, 14.5),
'COOLANT_CORE_MAX_PRESSURE': [15.0],
'COOLANT_CORE_VESSEL_TEMPERATURE': (270.0, 290.0),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (95.0, 100.0),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (95.0, 100.0),
'COOLANT_CORE_FLOW_SPEED': (9.5, 10.5),
'COOLANT_CORE_FLOW_ORDERED_SPEED': [10.0],
'COOLANT_CORE_FLOW_REACHED_SPEED': [True],
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': [3],
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': [3],
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (95.0, 100.0),
'RODS_STATUS': ['ACTIVE'],
'RODS_MOVEMENT_SPEED': (0.9, 1.1),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': [False],
'RODS_DEFORMED': [False],
'RODS_TEMPERATURE': (290.0, 310.0),
'RODS_MAX_TEMPERATURE': [1000.0],
'RODS_POS_ORDERED': [50.0],
'RODS_POS_ACTUAL': (49.5, 50.5),
'RODS_POS_REACHED': [True],
'RODS_QUANTITY': [100],
'RODS_ALIGNED': [True],
'GENERATOR_0_KW': (950.0, 1050.0),
'GENERATOR_1_KW': (950.0, 1050.0),
'GENERATOR_2_KW': (950.0, 1050.0),
'GENERATOR_0_V': (380.0, 420.0),
'GENERATOR_1_V': (380.0, 420.0),
'GENERATOR_2_V': (380.0, 420.0),
'GENERATOR_0_A': (2375.0, 2625.0),
'GENERATOR_1_A': (2375.0, 2625.0),
'GENERATOR_2_A': (2375.0, 2625.0),
'GENERATOR_0_HERTZ': (49.5, 50.5),
'GENERATOR_1_HERTZ': (49.5, 50.5),
'GENERATOR_2_HERTZ': (49.5, 50.5),
'GENERATOR_0_BREAKER': [BreakerStatus.CLOSED],
'GENERATOR_1_BREAKER': [BreakerStatus.CLOSED],
'GENERATOR_2_BREAKER': [BreakerStatus.CLOSED],
'STEAM_TURBINE_0_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_1_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_2_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_0_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_1_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_2_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_0_PRESSURE': (13.5, 14.5),
'STEAM_TURBINE_1_PRESSURE': (13.5, 14.5),
'STEAM_TURBINE_2_PRESSURE': (13.5, 14.5),
}
class NuconSimulator:
class Parameters:
def __init__(self, nucon: Nucon):
for param_name in nucon.get_all_readable():
setattr(self, param_name, None)
def __init__(self, host: str = 'localhost', port: int = 8786):
self._nucon = Nucon()
self.parameters = self.Parameters(self._nucon)
self.time = 0.0
self.allow_all_writes = False
self.set_state(OperatingState.OFFLINE)
self.model = None
self.readable_params = list(self._nucon.get_all_readable().keys())
self.non_writable_params = [name for name, param in self._nucon.get_all_readable().items() if not param.is_writable]
self._run(host, port)
def get(self, parameter: Union[str, Any]) -> Any:
if isinstance(parameter, str):
return getattr(self.parameters, parameter)
return getattr(self.parameters, parameter.id)
def set(self, parameter: Union[str, Any], value: Any, force: bool = False) -> None:
if isinstance(parameter, str):
param_obj = self._nucon[parameter]
else:
param_obj = parameter
if not param_obj.is_writable and not force and not self.allow_all_writes:
raise ValueError(f"Parameter {param_obj.id} is not writable")
# Convert value to the correct type
try:
if param_obj.enum_type:
if isinstance(value, str):
value = param_obj.enum_type[value.upper()]
elif isinstance(value, int):
value = param_obj.enum_type(value)
elif not isinstance(value, param_obj.enum_type):
raise ValueError(f"Invalid enum value for {param_obj.id}")
else:
value = param_obj.param_type(value)
except (ValueError, KeyError):
raise ValueError(f"Invalid type for parameter {param_obj.id}. Expected {param_obj.param_type}")
# Check range if not forced
if not force and param_obj.min_val is not None and param_obj.max_val is not None:
if not param_obj.min_val <= value <= param_obj.max_val:
raise ValueError(f"Value {value} is out of range for parameter {param_obj.id}. "
f"Valid range: [{param_obj.min_val}, {param_obj.max_val}]")
setattr(self.parameters, param_obj.id, value)
def set_allow_all_writes(self, allow: bool) -> None:
self.allow_all_writes = allow
def update(self, time_step: float) -> None:
self._update_reactor_state(time_step)
self.time += time_step
def load_model(self, model_path: str) -> None:
try:
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.model.load_state_dict(torch.load(model_path))
self.model.eval() # Set the model to evaluation mode
print(f"Model loaded successfully from {model_path}")
except Exception as e:
print(f"Error loading model: {str(e)}")
self.model = None
def _update_reactor_state(self, time_step: float) -> None:
if not self.model:
raise ValueError("Model not set. Please load a model using load_model() method.")
state = {}
for param in self.readable_params:
value = self.get(param)
if isinstance(value, Enum):
value = value.value
state[param] = value
# Use the model to predict the next state
with torch.no_grad():
next_state = self.model(state, time_step)
# Update the simulator's state
for param, value in next_state.items():
self.set(param, value)
def set_state(self, state: OperatingState) -> None:
self._sample_parameters_from_state(state)
def _sample_parameters_from_state(self, state: OperatingState) -> None:
for param_name, value_spec in state.value.items():
param = self._nucon.get_all_readable()[param_name]
if isinstance(value_spec, tuple):
value = random.uniform(*value_spec)
elif isinstance(value_spec, list):
value = random.choice(value_spec)
else:
raise ValueError(f"Invalid value specification for parameter {param_name}")
self.set(param, value, force=True)
def _run(self, port: int = 8786, host: str = 'localhost', debug: bool = False):
app = Flask(__name__)
@app.route('/', methods=['GET'])
def get_parameter():
variable = request.args.get('variable')
if not variable:
return jsonify({"error": "No variable specified"}), 400
try:
value = self.get(variable)
return str(value), 200
except KeyError:
return jsonify({"error": f"Unknown variable: {variable}"}), 404
@app.route('/', methods=['POST'])
def set_parameter():
variable = request.args.get('variable')
value = request.args.get('value')
if not variable or value is None:
return jsonify({"error": "Both variable and value must be specified"}), 400
try:
self.set(variable, value)
return "OK", 200
except ValueError as e:
return jsonify({"error": str(e)}), 400
except KeyError:
return jsonify({"error": f"Unknown variable: {variable}"}), 404
def run_simulator(host='localhost', port=8786, debug=False):
app.run(host=host, port=port, debug=debug)
threading.Thread(target=run_simulator, args=(host, port, debug), daemon=True).start()

View File

@ -1,50 +1,41 @@
import pytest import pytest
import warnings import warnings
from nucon import Nucon, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus from nucon import Nucon, NuconConfig, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
WARN_FLOAT_COULD_BE_INT = False WARN_FLOAT_COULD_BE_INT = False
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def nucon_setup(): def nucon_setup():
Nucon.set_dummy_mode(False) Nucon.set_dummy_mode(False) # Assume the game is running
Nucon.set_base_url("http://localhost:8785/") Nucon.set_base_url("http://localhost:8785/")
yield
Nucon.set_dummy_mode(True)
def test_read_all_parameters(nucon_setup): def test_read_all_parameters(nucon_setup):
all_params = Nucon.get_all() all_params = Nucon.get_all()
assert len(all_params) == len(Nucon) assert len(all_params) == len(Nucon)
for param, value in all_params.items(): for param, value in all_params.items():
assert isinstance(value, param.param_type), f"Parameter {param.id} has incorrect type. Expected {param.param_type}, got {type(value)}" assert isinstance(value, param.param_type), f"Parameter {param.name} has incorrect type. Expected {param.param_type}, got {type(value)}"
if param.param_type == float and value.is_integer() and WARN_FLOAT_COULD_BE_INT: if param.param_type == float and value.is_integer() and WARN_FLOAT_COULD_BE_INT:
warnings.warn(f"Parameter {param.id} is a float but has an integer value: {value}") warnings.warn(f"Parameter {param.name} is a float but has an integer value: {value}")
if param.param_type == str:
try:
float(value)
raise ValueError(f"Parameter {param.id} is a string that looks like a number: {value}")
except ValueError:
pass
try:
bool(value.lower())
raise ValueError(f"Parameter {param.id} is a string that looks like a boolean: {value}")
except ValueError:
pass
def test_write_writable_parameters(nucon_setup): def test_write_writable_parameters(nucon_setup):
writable_params = Nucon.get_all_writable() writable_params = Nucon.get_all_writable()
for param in writable_params: for param in writable_params:
current_value = param.value current_value = param.value
param.value = current_value param.value = current_value
assert param.value == current_value, f"Failed to write to parameter {param.id}" assert param.value == current_value, f"Failed to write to parameter {param.name}"
def test_non_writable_parameters(nucon_setup): def test_non_writable_parameters(nucon_setup):
non_writable_params = [param for param in Nucon if not param.is_writable] non_writable_params = [param for param in Nucon if not param.is_writable]
for param in non_writable_params: for param in non_writable_params:
# Test that normal set raises an error # Test that normal set raises an error
with pytest.raises(ValueError, match=f"Parameter {param.id} is not writable"): with pytest.raises(ValueError, match=f"Parameter {param.name} is not writable"):
param.value = param.value # Attempt to write the current value param.value = param.value # Attempt to write the current value
# Test that force_set is refused by the webserver # Test that force_set is refused by the webserver
current_value = param.value current_value = param.value
with pytest.raises(Exception, match=f"Failed to set parameter {param.id}"): with pytest.raises(Exception, match=f"Failed to set parameter {param.name}"):
Nucon.set(param, current_value, force=True) Nucon.set(param, current_value, force=True)
def test_enum_parameters(nucon_setup): def test_enum_parameters(nucon_setup):

View File

@ -1,114 +0,0 @@
import pytest
import time
from nucon import Nucon, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
from nucon.sim import NuconSimulator, OperatingState
@pytest.fixture(scope="module")
def simulator_setup():
simulator = NuconSimulator(port=8786)
simulator.run()
time.sleep(1) # Give the simulator time to start
nucon = Nucon(port=8786)
return simulator, nucon
def test_simulator_initialization(simulator_setup):
simulator, nucon = simulator_setup
assert simulator is not None
assert nucon is not None
def test_read_all_parameters(simulator_setup):
_, nucon = simulator_setup
all_params = nucon.get_all()
assert len(all_params) == len(nucon)
for param_id, value in all_params.items():
param = nucon[param_id]
assert isinstance(value, param.param_type), f"Parameter {param.id} has incorrect type. Expected {param.param_type}, got {type(value)}"
def test_write_writable_parameters(simulator_setup):
_, nucon = simulator_setup
writable_params = nucon.get_all_writable()
for param in writable_params.values():
current_value = param.value
if param.param_type == float:
new_value = current_value + 1.0
elif param.param_type == int:
new_value = current_value + 1
elif param.param_type == bool:
new_value = not current_value
elif issubclass(param.param_type, Enum):
new_value = list(param.param_type)[0]
else:
continue # Skip if we can't determine a new value
param.value = new_value
assert param.value == new_value, f"Failed to write to parameter {param.id}"
def test_non_writable_parameters(simulator_setup):
_, nucon = simulator_setup
non_writable_params = [param for param in nucon.get_all_readable().values() if not param.is_writable]
for param in non_writable_params:
with pytest.raises(ValueError, match=f"Parameter {param.id} is not writable"):
param.value = param.value # Attempt to write the current value
def test_enum_parameters(simulator_setup):
_, nucon = simulator_setup
pump_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
assert isinstance(pump_status, PumpStatus)
dry_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value
assert isinstance(dry_status, PumpDryStatus)
overload_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value
assert isinstance(overload_status, PumpOverloadStatus)
breaker_status = nucon.GENERATOR_0_BREAKER.value
assert isinstance(breaker_status, BreakerStatus)
def test_custom_truthy_values(simulator_setup):
_, nucon = simulator_setup
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value in [PumpStatus.ACTIVE_NO_SPEED_REACHED, PumpStatus.ACTIVE_SPEED_REACHED])
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value == PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID)
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value == PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD)
assert bool(nucon.GENERATOR_0_BREAKER.value) == (nucon.GENERATOR_0_BREAKER.value == BreakerStatus.OPEN)
def test_get_multiple_parameters(simulator_setup):
_, nucon = simulator_setup
params_to_get = [nucon.CORE_TEMP, nucon.CORE_PRESSURE, nucon.RODS_POS_ACTUAL]
multiple_params = nucon.get_multiple(params_to_get)
assert len(multiple_params) == len(params_to_get)
for param_id, value in multiple_params.items():
assert isinstance(value, nucon[param_id].param_type)
def test_simulator_update(simulator_setup):
simulator, nucon = simulator_setup
initial_temp = nucon.CORE_TEMP.value
simulator.update(10) # Update for 10 seconds
new_temp = nucon.CORE_TEMP.value
assert new_temp != initial_temp, "Core temperature should change after update"
def test_set_operating_state(simulator_setup):
simulator, nucon = simulator_setup
simulator._set_state(OperatingState.NOMINAL)
time.sleep(1) # Give the simulator time to update
assert nucon.CORE_STATE.value == 'NOMINAL'
assert 290 <= nucon.CORE_TEMP.value <= 370
def test_allow_all_writes(simulator_setup):
simulator, nucon = simulator_setup
simulator.set_allow_all_writes(True)
non_writable_param = next(param for param in nucon.get_all_readable().values() if not param.is_writable)
current_value = non_writable_param.value
new_value = current_value + 1 if isinstance(current_value, (int, float)) else not current_value
non_writable_param.value = new_value
assert non_writable_param.value == new_value, f"Failed to write to non-writable parameter {non_writable_param.id} when allow_all_writes is True"
def test_simulator_consistency(simulator_setup):
simulator, nucon = simulator_setup
for _ in range(10):
simulator.update(1)
all_params = nucon.get_all()
for param_id, value in all_params.items():
assert value == getattr(simulator.parameters, param_id), f"Inconsistency found for parameter {param_id}"
if __name__ == "__main__":
pytest.main()