Compare commits

...

6 Commits

Author SHA1 Message Date
e665a457dc Extended Test Suite 2024-10-03 21:57:08 +02:00
03da3415c8 Updated __init__ 2024-10-03 21:56:46 +02:00
4c3ad983fc Morer objectives and fixes 2024-10-03 21:56:27 +02:00
33b5db2f57 Fixes and Updates 2024-10-03 21:56:16 +02:00
60cd44cc9e Implemenetd Model Learning 2024-10-03 21:55:59 +02:00
132c47ff21 Implemented Simulator 2024-10-03 21:55:44 +02:00
7 changed files with 876 additions and 271 deletions

View File

@ -1,28 +1,20 @@
from enum import Enum, IntEnum from enum import Enum
from typing import Union, Dict, Type, List, Optional, Any, Iterator, Tuple
import requests import requests
from typing import Union, Dict, Type, List, Optional, Any import random
class NuconConfig:
base_url = "http://localhost:8785/"
dummy_mode = False
class ParameterEnum(Enum): class ParameterEnum(Enum):
@classmethod @classmethod
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]: def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
if isinstance(value, str): if isinstance(value, str):
# Handle boolean-like strings if value.lower() == 'true':
if value.lower() in ('true'):
return cls(True) return cls(True)
elif value.lower() in ('false'): elif value.lower() == 'false':
return cls(False) return cls(False)
# Try to convert to int for int-based enums
try: try:
return cls(int(value)) return cls(int(value))
except ValueError: except ValueError:
pass pass
# If we can't handle the value, let the default Enum behavior take over
return None return None
class PumpStatus(ParameterEnum): class PumpStatus(ParameterEnum):
@ -61,214 +53,208 @@ CoreState = str
CoolantCoreState = str CoolantCoreState = str
RodsState = str RodsState = str
#class CoreState(ParameterEnum): class NuconParameter:
# REACTIVE = 'REACTIVO' def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
# self.nucon = nucon
# def __bool__(self):
# return self.value == 'REACTIVO'
#class CoolantCoreState(ParameterEnum):
# CIRCULATING = 'CIRCULANDO'
#
# def __bool__(self):
# return self.value == 'CIRCULANDO'
class Nucon(Enum):
# Core
CORE_TEMP = ("CORE_TEMP", float, False)
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False)
CORE_PRESSURE = ("CORE_PRESSURE", float, False)
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
CORE_WEAR = ("CORE_WEAR", float, False)
CORE_STATE = ("CORE_STATE", CoreState, False)
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
CORE_IMMINENT_FUSION = ("CORE_IMMINENT_FUSION", bool, False)
CORE_READY_FOR_START = ("CORE_READY_FOR_START", bool, False)
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
# Time
TIME = ("TIME", str, False)
TIME_STAMP = ("TIME_STAMP", str, False)
# Coolant Core
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False)
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False)
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False)
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
# Circulation Pumps
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
# Rods
RODS_STATUS = ("RODS_STATUS", RodsState, False)
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
RODS_TEMPERATURE = ("RODS_TEMPERATURE", float, False)
RODS_MAX_TEMPERATURE = ("RODS_MAX_TEMPERATURE", float, False)
RODS_POS_ORDERED = ("RODS_POS_ORDERED", float, True)
RODS_POS_ACTUAL = ("RODS_POS_ACTUAL", float, False)
RODS_POS_REACHED = ("RODS_POS_REACHED", bool, False)
RODS_QUANTITY = ("RODS_QUANTITY", int, False)
RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
# Generators
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
GENERATOR_0_V = ("GENERATOR_0_V", float, False)
GENERATOR_1_V = ("GENERATOR_1_V", float, False)
GENERATOR_2_V = ("GENERATOR_2_V", float, False)
GENERATOR_0_A = ("GENERATOR_0_A", float, False)
GENERATOR_1_A = ("GENERATOR_1_A", float, False)
GENERATOR_2_A = ("GENERATOR_2_A", float, False)
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False)
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False)
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False)
# Steam Turbines
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
STEAM_TURBINE_0_TEMPERATURE = ("STEAM_TURBINE_0_TEMPERATURE", float, False)
STEAM_TURBINE_1_TEMPERATURE = ("STEAM_TURBINE_1_TEMPERATURE", float, False)
STEAM_TURBINE_2_TEMPERATURE = ("STEAM_TURBINE_2_TEMPERATURE", float, False)
STEAM_TURBINE_0_PRESSURE = ("STEAM_TURBINE_0_PRESSURE", float, False)
STEAM_TURBINE_1_PRESSURE = ("STEAM_TURBINE_1_PRESSURE", float, False)
STEAM_TURBINE_2_PRESSURE = ("STEAM_TURBINE_2_PRESSURE", float, False)
def __init__(self, id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
self.id = id self.id = id
self.param_type = param_type self.param_type = param_type
self.is_writable = is_writable self.is_writable = is_writable
self.min_val = min_val self.min_val = min_val
self.max_val = max_val self.max_val = max_val
def __repr__(self) -> str:
type_repr = repr(self.param_type)
return (f"<{self.__class__.__name__}.{self.name}: "
f"{{'value': {repr(self.value)}, 'type': {type_repr}, "
f"'writable': {self.is_writable}}}>"
)
def __str__(self):
return self.id
@property @property
def enum_type(self) -> Type[Enum]: def enum_type(self) -> Type[Enum]:
return self.param_type if issubclass(self.param_type, Enum) else None return self.param_type if issubclass(self.param_type, Enum) else None
@property def check_in_range(self, value: Union[int, float, Enum], raise_on_oob: bool = False) -> bool:
def value(self) -> Union[float, int, bool, str, Enum]: if self.enum_type:
return self.read() if not isinstance(value, self.enum_type):
if raise_on_oob:
raise ValueError(f"Value {value} is not a valid {self.enum_type.__name__}")
return False
return True
def read(self) -> Union[float, int, bool, str, Enum]:
return Nucon.get(self)
@value.setter
def value(self, new_value: Union[float, int, bool, str, Enum]) -> None:
self.write(new_value)
def write(self, new_value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
Nucon.set(self, new_value, force)
def check_in_range(self, value: Union[int, float], raise_on_oob=False) -> None:
if self.min_val is not None and value < self.min_val: if self.min_val is not None and value < self.min_val:
if raise_on_oob: if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val} for {self.name}") raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val}")
return False return False
if self.max_val is not None and value > self.max_val: if self.max_val is not None and value > self.max_val:
if raise_on_oob: if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val} for {self.name}") raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val}")
return False return False
return True return True
@classmethod @property
def set_base_url(cls, url: str) -> None: def value(self):
NuconConfig.base_url = url return self.nucon.get(self)
@classmethod @value.setter
def set_dummy_mode(cls, dummy_mode: bool) -> None: def value(self, new_value):
NuconConfig.dummy_mode = dummy_mode self.nucon.set(self, new_value)
@classmethod def read(self):
def get_all_readable(cls) -> List['Nucon']: return self.value
return list(cls) # All parameters are readable
@classmethod def write(self, new_value, force=False):
def get_all_writable(cls) -> List['Nucon']: self.nucon.set(self, new_value, force)
return [param for param in cls if param.is_writable]
@classmethod def __repr__(self):
def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]: return f"NuconParameter(id='{self.id}', value={self.value}, type={self.param_type.__name__}, writable={self.is_writable})"
def __str__(self):
return self.id
class Nucon:
def __init__(self, host: str = 'localhost', port: int = 8786):
self.base_url = f'http://{host}:{port}/'
self.dummy_mode = False
self._parameters = self._create_parameters()
def _create_parameters(self) -> Dict[str, NuconParameter]:
param_values = {
'CORE_TEMP': (float, False, 0, 1000),
'CORE_TEMP_OPERATIVE': (float, False),
'CORE_TEMP_MAX': (float, False),
'CORE_TEMP_MIN': (float, False),
'CORE_TEMP_RESIDUAL': (bool, False),
'CORE_PRESSURE': (float, False),
'CORE_PRESSURE_MAX': (float, False),
'CORE_PRESSURE_OPERATIVE': (float, False),
'CORE_INTEGRITY': (float, False),
'CORE_WEAR': (float, False),
'CORE_STATE': (CoreState, False),
'CORE_STATE_CRITICALITY': (float, False),
'CORE_CRITICAL_MASS_REACHED': (bool, False),
'CORE_CRITICAL_MASS_REACHED_COUNTER': (int, False),
'CORE_IMMINENT_FUSION': (bool, False),
'CORE_READY_FOR_START': (bool, False),
'CORE_STEAM_PRESENT': (bool, False),
'CORE_HIGH_STEAM_PRESENT': (bool, False),
'TIME': (str, False),
'TIME_STAMP': (str, False),
'COOLANT_CORE_STATE': (CoolantCoreState, False),
'COOLANT_CORE_PRESSURE': (float, False),
'COOLANT_CORE_MAX_PRESSURE': (float, False),
'COOLANT_CORE_VESSEL_TEMPERATURE': (float, False),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (float, False),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (float, False),
'COOLANT_CORE_FLOW_SPEED': (float, False),
'COOLANT_CORE_FLOW_ORDERED_SPEED': (float, False),
'COOLANT_CORE_FLOW_REACHED_SPEED': (bool, False),
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (float, False),
'RODS_STATUS': (RodsState, False),
'RODS_MOVEMENT_SPEED': (float, False),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': (bool, False),
'RODS_DEFORMED': (bool, False),
'RODS_TEMPERATURE': (float, False),
'RODS_MAX_TEMPERATURE': (float, False),
'RODS_POS_ORDERED': (float, True),
'RODS_POS_ACTUAL': (float, False),
'RODS_POS_REACHED': (bool, False),
'RODS_QUANTITY': (int, False),
'RODS_ALIGNED': (bool, False),
'GENERATOR_0_KW': (float, False),
'GENERATOR_1_KW': (float, False),
'GENERATOR_2_KW': (float, False),
'GENERATOR_0_V': (float, False),
'GENERATOR_1_V': (float, False),
'GENERATOR_2_V': (float, False),
'GENERATOR_0_A': (float, False),
'GENERATOR_1_A': (float, False),
'GENERATOR_2_A': (float, False),
'GENERATOR_0_HERTZ': (float, False),
'GENERATOR_1_HERTZ': (float, False),
'GENERATOR_2_HERTZ': (float, False),
'GENERATOR_0_BREAKER': (BreakerStatus, False),
'GENERATOR_1_BREAKER': (BreakerStatus, False),
'GENERATOR_2_BREAKER': (BreakerStatus, False),
'STEAM_TURBINE_0_RPM': (float, False),
'STEAM_TURBINE_1_RPM': (float, False),
'STEAM_TURBINE_2_RPM': (float, False),
'STEAM_TURBINE_0_TEMPERATURE': (float, False),
'STEAM_TURBINE_1_TEMPERATURE': (float, False),
'STEAM_TURBINE_2_TEMPERATURE': (float, False),
'STEAM_TURBINE_0_PRESSURE': (float, False),
'STEAM_TURBINE_1_PRESSURE': (float, False),
'STEAM_TURBINE_2_PRESSURE': (float, False),
}
return {
name: NuconParameter(self, name, *values)
for name, values in param_values.items()
}
def get(self, parameter: Union[str, NuconParameter]) -> Union[float, int, bool, str, Enum]:
if isinstance(parameter, str): if isinstance(parameter, str):
parameter = cls[parameter] parameter = self._parameters[parameter]
if NuconConfig.dummy_mode: if self.dummy_mode:
return cls._get_dummy_value(parameter) return self._get_dummy_value(parameter)
value = cls._query(parameter.name) value = self._query(parameter)
if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum): if parameter.enum_type:
return parameter.enum_type(value) return parameter.enum_type(value)
elif parameter.param_type == float:
return float(value)
elif parameter.param_type == int:
return int(value)
elif parameter.param_type == bool: elif parameter.param_type == bool:
if value.lower() in ('true'): if isinstance(value, str):
return True if value.lower() not in ('true', 'false'):
elif value.lower() in ('false'): raise ValueError(f"Invalid boolean value: {value}")
return False return value.lower() == 'true'
else: else:
raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).") raise ValueError(f"Expected string for boolean parameter, got {type(value)}")
else: else:
return value return parameter.param_type(value)
@classmethod def set(self, parameter: Union[str, NuconParameter], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
def _query(cls, parameter_name: str) -> str: if isinstance(parameter, str):
response = requests.get(NuconConfig.base_url, params={"variable": parameter_name}) parameter = self._parameters[parameter]
if not force and not parameter.is_writable:
raise ValueError(f"Parameter {parameter} is not writable")
if not force:
parameter.check_in_range(value, raise_on_oob=True)
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
if self.dummy_mode:
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter} to {value}")
return
self._set_value(parameter, str(value))
def _query(self, parameter: NuconParameter) -> str:
response = requests.get(self.base_url, params={"variable": parameter.id})
if response.status_code != 200: if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}") raise Exception(f"Failed to query parameter {parameter.id}. Status code: {response.status_code}")
return response.text.strip() return response.text.strip()
@classmethod def _set_value(self, parameter: NuconParameter, value: str) -> None:
def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]: response = requests.post(self.base_url, params={"variable": parameter.id, "value": value})
if response.status_code != 200:
raise Exception(f"Failed to set parameter {parameter.id}. Status code: {response.status_code}")
def _get_dummy_value(self, parameter: NuconParameter) -> Union[float, int, bool, str, Enum]:
if parameter.enum_type: if parameter.enum_type:
return next(iter(parameter.enum_type)) return next(iter(parameter.enum_type))
elif parameter.param_type == float: elif parameter.param_type == float:
@ -278,69 +264,52 @@ class Nucon(Enum):
return 3.14 return 3.14
elif parameter.param_type == int: elif parameter.param_type == int:
if parameter.max_val is not None and parameter.min_val is not None: if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val return (parameter.max_val - parameter.min_val) // 2 + parameter.min_val
else: else:
return 42 return 42
elif parameter.param_type == bool: elif parameter.param_type == bool:
return False return random.choice([True, False])
else: else:
return "dummy" return "dummy"
@classmethod def get_multiple_iter(self, parameters: List[Union[str, NuconParameter]]) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]: for param in parameters:
return {param: cls.get(param) for param in parameters} if isinstance(param, str):
param_name = param
param_obj = self._parameters[param]
else:
param_name = next(name for name, p in self._parameters.items() if p is param)
param_obj = param
yield param_name, self.get(param_obj)
@classmethod def get_multiple(self, parameters: List[Union[str, NuconParameter]]) -> Dict[str, Union[float, int, bool, str, Enum]]:
def get_all(cls) -> Dict['Nucon', Union[float, int, bool, str, Enum]]: return dict(self.get_multiple_iter(parameters))
return cls.get_multiple(list(cls))
@classmethod def get_all_iter(self) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
def set(cls, parameter: Union['Nucon', str], value: Union[float, int, bool, str, Enum], force: bool = False) -> None: return self.get_multiple_iter(self._parameters.keys())
if isinstance(parameter, str):
parameter = next((param for param in cls if param.id == parameter), None)
if parameter is None:
raise ValueError(f"No parameter found with id '{parameter}'")
if not force and not parameter.is_writable: def get_all(self) -> Dict[str, Union[float, int, bool, str, Enum]]:
raise ValueError(f"Parameter {parameter.name} is not writable") return dict(self.get_all_iter())
if not force: def get_all_readable(self) -> List[NuconParameter]:
parameter.check_in_range(value, raise_on_oob=True) return {name: param for name, param in self._parameters.items()}
if parameter.enum_type and isinstance(value, parameter.enum_type): def get_all_writable(self) -> List[NuconParameter]:
value = value.value return {name: param for name, param in self._parameters.items() if param.is_writable}
if NuconConfig.dummy_mode: def set_dummy_mode(self, dummy_mode: bool) -> None:
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter.name} to {value}") self.dummy_mode = dummy_mode
return
response = requests.post(NuconConfig.base_url, params={"variable": parameter.name, "value": str(value)}) def __getattr__(self, name):
if name in self._parameters:
return self._parameters[name]
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
if response.status_code != 200: def __getitem__(self, key):
raise Exception(f"Failed to set parameter {parameter.name}. Status code: {response.status_code}") return self.__getattr__(key)
# Example usage def __dir__(self):
if __name__ == "__main__": return list(super().__dir__()) + list(self._parameters.keys())
# Enable dummy mode for testing
Nucon.set_dummy_mode(True)
# Get a single parameter def __len__(self):
core_temp = Nucon.CORE_TEMP.value return len(self._parameters)
print(f"Core Temperature: {core_temp}")
# Get a parameter with an enum
pump_status = Nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
print(f"Pump 0 Status: {pump_status}")
# Set a parameter with an enum
try:
Nucon.GENERATOR_0_BREAKER.value = BreakerStatus.OPEN
print(f"Successfully set GENERATOR_0_BREAKER to {Nucon.GENERATOR_0_BREAKER.value}")
except ValueError as e:
print(f"Error: {e}")
# Get all parameters
all_params = Nucon.get_all()
print("All parameters:")
for param, value in all_params.items():
print(f"{param.name}: {value}")

174
nucon/model.py Normal file
View File

@ -0,0 +1,174 @@
import numpy as np
import time
import torch
import torch.nn as nn
import torch.optim as optim
import random
from enum import Enum
from nucon import Nucon
import pickle
import os
from typing import Union, Tuple, List
Actors = {
'random': lambda nucon: lambda obs: {param.id: random.uniform(param.min_val, param.max_val) if param.min_val is not None and param.max_val is not None else 0 for param in nucon.get_all_writable().values()},
'null': lambda nucon: lambda obs: {},
}
class ReactorDynamicsNet(nn.Module):
def __init__(self, input_dim, output_dim):
super(ReactorDynamicsNet, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim + 1, 128), # +1 for time_delta
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, state, time_delta):
x = torch.cat([state, time_delta], dim=-1)
return self.network(x)
class ReactorDynamicsModel(nn.Module):
def __init__(self, input_params: List[str], output_params: List[str]):
super(ReactorDynamicsModel, self).__init__()
self.input_params = input_params
self.output_params = output_params
input_dim = len(input_params)
output_dim = len(output_params)
self.net = ReactorDynamicsNet(input_dim, output_dim)
def _state_dict_to_tensor(self, state_dict):
return torch.tensor([state_dict[p] for p in self.input_params], dtype=torch.float32)
def _tensor_to_state_dict(self, tensor):
return {p: tensor[i].item() for i, p in enumerate(self.output_params)}
def forward(self, state_dict, time_delta):
state_tensor = self._state_dict_to_tensor(state_dict).unsqueeze(0)
time_delta_tensor = torch.tensor([time_delta], dtype=torch.float32).unsqueeze(0)
predicted_tensor = self.net(state_tensor, time_delta_tensor)
return self._tensor_to_state_dict(predicted_tensor.squeeze(0))
class NuconModelLearner:
def __init__(self, nucon=None, actor='null', dataset_path='nucon_dataset.pkl', time_delta: Union[float, Tuple[float, float]] = 0.1):
self.nucon = Nucon() if nucon is None else nucon
self.actor = Actors[actor](self.nucon) if actor in Actors else actor
self.dataset = self.load_dataset(dataset_path) or []
self.dataset_path = dataset_path
self.readable_params = list(self.nucon.get_all_readable().keys())
self.non_writable_params = [param.id for param in self.nucon.get_all_readable().values() if not param.is_writable]
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.optimizer = optim.Adam(self.model.parameters())
if isinstance(time_delta, (int, float)):
self.time_delta = lambda: time_delta
elif isinstance(time_delta, tuple) and len(time_delta) == 2:
self.time_delta = lambda: random.uniform(*time_delta)
else:
raise ValueError("time_delta must be a float or a tuple of two floats")
def _get_state(self):
state = {}
for param_id, param in self.nucon.get_all_readable().items():
value = self.nucon.get(param)
if isinstance(value, Enum):
value = value.value
state[param_id] = value
return state
def collect_data(self, num_steps):
state = self._get_state()
for _ in range(num_steps):
action = self.actor(state)
start_time = time.time()
for param_id, value in action.items():
self.nucon.set(param_id, value)
time_delta = self.time_delta()
time.sleep(time_delta)
next_state = self._get_state()
self.dataset.append((state, action, next_state, time_delta))
state = next_state
self.save_dataset()
def refine_dataset(self, error_threshold):
refined_data = []
for state, action, next_state, time_delta in self.dataset:
predicted_next_state = self.model(state, time_delta)
error = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
if error > error_threshold:
refined_data.append((state, action, next_state, time_delta))
self.dataset = refined_data
self.save_dataset()
def train_model(self, batch_size=32, num_epochs=10, test_split=0.2):
random.shuffle(self.dataset)
split_idx = int(len(self.dataset) * (1 - test_split))
train_data = self.dataset[:split_idx]
test_data = self.dataset[split_idx:]
for epoch in range(num_epochs):
train_loss = self._train_epoch(train_data, batch_size)
test_loss = self._test_epoch(test_data)
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
def _train_epoch(self, data, batch_size):
total_loss = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
states, _, next_states, time_deltas = zip(*batch)
loss = 0
for state, next_state, time_delta in zip(states, next_states, time_deltas):
predicted_next_state = self.model(state, time_delta)
loss += sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
loss /= len(batch)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / (len(data) // batch_size)
def _test_epoch(self, data):
total_loss = 0
with torch.no_grad():
for state, _, next_state, time_delta in data:
predicted_next_state = self.model(state, time_delta)
loss = sum((predicted_next_state[p] - next_state[p])**2 for p in self.non_writable_params)
total_loss += loss
return total_loss / len(data)
def save_model(self, path):
torch.save(self.model.state_dict(), path)
def load_model(self, path):
self.model.load_state_dict(torch.load(path))
def save_dataset(self, path=None):
path = path or self.dataset_path
with open(path, 'wb') as f:
pickle.dump(self.dataset, f)
def load_dataset(self, path=None):
path = path or self.dataset_path
if os.path.exists(path):
with open(path, 'rb') as f:
return pickle.load(f)
return None
def merge_datasets(self, other_dataset_path):
other_dataset = self.load_dataset(other_dataset_path)
if other_dataset:
self.dataset.extend(other_dataset)
self.save_dataset()

View File

@ -3,7 +3,7 @@ from gymnasium import spaces
import numpy as np import numpy as np
import time import time
from typing import Dict, Any from typing import Dict, Any
from .core import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus from nucon import Nucon, BreakerStatus, PumpStatus, PumpDryStatus, PumpOverloadStatus
Objectives = { Objectives = {
"null": lambda obs: 0, "null": lambda obs: 0,
@ -13,12 +13,16 @@ Objectives = {
Parameterized_Objectives = { Parameterized_Objectives = {
"target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2), "target_temperature": lambda goal_temp: lambda obs: -((obs["CORE_TEMP"] - goal_temp) ** 2),
"target_gap": lambda goal_gap: lambda obs: -((obs["CORE_TEMP"] - obs["CORE_TEMP_MIN"] - goal_gap) ** 2),
"temp_below": lambda max_temp: lambda obs: -(np.clip(obs["CORE_TEMP"] - max_temp, 0, np.inf) ** 2),
"temp_above": lambda min_temp: lambda obs: -(np.clip(min_temp - obs["CORE_TEMP"], 0, np.inf) ** 2),
"constant": lambda constant: lambda obs: constant,
} }
class NuconEnv(gym.Env): class NuconEnv(gym.Env):
metadata = {'render_modes': ['human']} metadata = {'render_modes': ['human']}
def __init__(self, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0): def __init__(self, nucon=None, simulator=None, render_mode=None, seconds_per_step=5, objectives=['null'], terminators=['null'], objective_weights=None, terminate_above=0):
super().__init__() super().__init__()
self.render_mode = render_mode self.render_mode = render_mode
@ -26,22 +30,30 @@ class NuconEnv(gym.Env):
if objective_weights is None: if objective_weights is None:
objective_weights = [1.0 for objective in objectives] objective_weights = [1.0 for objective in objectives]
self.objective_weights = objective_weights self.objective_weights = objective_weights
self.terminate_at = terminate_at self.terminate_above = terminate_above
self.simulator = simulator
if nucon is None:
if simulator:
nucon = Nucon(port=simulator.port)
else:
nucon = Nucon()
self.nucon = nucon
# Define observation space # Define observation space
obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)} obs_spaces = {'EPISODE_TIME': spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)}
for param in Nucon.get_all_readable(): for param_id, param in self.nucon.get_all_readable().items():
if param.param_type == float: if param.param_type == float:
obs_spaces[param.id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) obs_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int: elif param.param_type == int:
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
obs_spaces[param.id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) obs_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else: else:
obs_spaces[param.id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32) obs_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool: elif param.param_type == bool:
obs_spaces[param.id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum): elif issubclass(param.param_type, Enum):
obs_spaces[param.id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32) obs_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else: else:
raise ValueError(f"Unsupported observation parameter type: {param.param_type}") raise ValueError(f"Unsupported observation parameter type: {param.param_type}")
@ -49,23 +61,26 @@ class NuconEnv(gym.Env):
# Define action space # Define action space
action_spaces = {} action_spaces = {}
for param in Nucon.get_all_writable(): for param_id, param in self.nucon.get_all_writable().items():
if param.param_type == float: if param.param_type == float:
action_spaces[param.id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32) action_spaces[param_id] = spaces.Box(low=param.min_val or -np.inf, high=param.max_val or np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == int: elif param.param_type == int:
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
action_spaces[param.id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32) action_spaces[param_id] = spaces.Box(low=param.min_val, high=param.max_val, shape=(1,), dtype=np.float32)
else: else:
action_spaces[param.id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32) action_spaces[param_id] = spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32)
elif param.param_type == bool: elif param.param_type == bool:
action_spaces[param.id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
elif issubclass(param.param_type, Enum): elif issubclass(param.param_type, Enum):
action_spaces[param.id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32) action_spaces[param_id] = spaces.Box(low=0, high=1, shape=(len(param.param_type),), dtype=np.float32)
else: else:
raise ValueError(f"Unsupported action parameter type: {param.param_type}") raise ValueError(f"Unsupported action parameter type: {param.param_type}")
self.action_space = spaces.Dict(action_spaces) self.action_space = spaces.Dict(action_spaces)
self.objectives = []
self.terminators = []
for objective in objectives: for objective in objectives:
if objective in Objectives: if objective in Objectives:
self.objectives.append(Objectives[objective]) self.objectives.append(Objectives[objective])
@ -84,11 +99,11 @@ class NuconEnv(gym.Env):
def _get_obs(self): def _get_obs(self):
obs = {} obs = {}
for param in Nucon.get_all_readable(): for param_id, param in self.nucon.get_all_readable().items():
value = Nucon.get(param) value = self.nucon.get(param_id)
if isinstance(value, Enum): if isinstance(value, Enum):
value = value.value value = value.value
obs[param.id] = value obs[param_id] = value
obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step obs["EPISODE_TIME"] = self._total_steps * self.seconds_per_step
return obs return obs
@ -112,12 +127,12 @@ class NuconEnv(gym.Env):
def step(self, action): def step(self, action):
# Apply the action to the Nucon system # Apply the action to the Nucon system
for param_id, value in action.items(): for param_id, value in action.items():
param = next(p for p in Nucon if p.id == param_id) param = next(p for p in self.nucon if p.id == param_id)
if issubclass(param.param_type, Enum): if issubclass(param.param_type, Enum):
value = param.param_type(value) value = param.param_type(value)
if param.min_val is not None and param.max_val is not None: if param.min_val is not None and param.max_val is not None:
value = np.clip(value, param.min_val, param.max_val) value = np.clip(value, param.min_val, param.max_val)
Nucon.set(param, value) self.nucon.set(param, value)
observation = self._get_obs() observation = self._get_obs()
terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above terminated = np.sum([terminator(observation) for terminator in self.terminators]) > self.terminate_above
@ -126,7 +141,10 @@ class NuconEnv(gym.Env):
reward = sum(obj for obj in info['objectives_weighted'].values()) reward = sum(obj for obj in info['objectives_weighted'].values())
self._total_steps += 1 self._total_steps += 1
time.sleep(self.seconds_per_step) if self.simulator:
self.simulator.update(self.seconds_per_step)
else:
time.sleep(self.seconds_per_step)
return observation, reward, terminated, truncated, info return observation, reward, terminated, truncated, info
def render(self): def render(self):
@ -148,6 +166,7 @@ class NuconEnv(gym.Env):
def _unflatten_observation(self, flat_observation): def _unflatten_observation(self, flat_observation):
return {k: v.reshape(1, -1) for k, v in self.observation_space.items()} return {k: v.reshape(1, -1) for k, v in self.observation_space.items()}
def register_nucon_envs(): def register_nucon_envs():
gym.register( gym.register(
id='Nucon-max_power-v0', id='Nucon-max_power-v0',
@ -155,9 +174,14 @@ def register_nucon_envs():
kwargs={'seconds_per_step': 5, 'objectives': ['max_power']} kwargs={'seconds_per_step': 5, 'objectives': ['max_power']}
) )
gym.register( gym.register(
id='Nucon-target_temperature_600-v0', id='Nucon-target_temperature_350-v0',
entry_point='nucon.rl:NuconEnv', entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=600)]} kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['target_temperature'](goal_temp=350)]}
)
gym.register(
id='Nucon-safe_max_power-v0',
entry_point='nucon.rl:NuconEnv',
kwargs={'seconds_per_step': 5, 'objectives': [Parameterized_Objectives['temp_above'](min_temp=310), Parameterized_Objectives['temp_below'](max_temp=365), 'max_power'], 'objective_weights': [1, 10, 1/100_000]}
) )
register_nucon_envs() register_nucon_envs()

315
nucon/sim.py Normal file
View File

@ -0,0 +1,315 @@
import random
from typing import Dict, Union, Any, Tuple, List
from enum import Enum
from flask import Flask, request, jsonify
from nucon import Nucon, ParameterEnum, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
import threading
import torch
from nucon.model import ReactorDynamicsModel
class OperatingState(Enum):
# Tuple indicates a range of values, while list indicates a set of possible values
OFFLINE = {
'CORE_TEMP': (18.0, 22.0),
'CORE_TEMP_OPERATIVE': [306.0],
'CORE_TEMP_MAX': [1000.0],
'CORE_TEMP_MIN': [0.0],
'CORE_TEMP_RESIDUAL': [False],
'CORE_PRESSURE': (0.9, 1.1),
'CORE_PRESSURE_MAX': [1.5],
'CORE_PRESSURE_OPERATIVE': [1.0],
'CORE_INTEGRITY': [100.0],
'CORE_WEAR': [0.0],
'CORE_STATE': ['OFFLINE'],
'CORE_STATE_CRITICALITY': [0.0],
'CORE_CRITICAL_MASS_REACHED': [False],
'CORE_CRITICAL_MASS_REACHED_COUNTER': [0],
'CORE_IMMINENT_FUSION': [False],
'CORE_READY_FOR_START': [False],
'CORE_STEAM_PRESENT': [False],
'CORE_HIGH_STEAM_PRESENT': [False],
'TIME': ['00:00:00'],
'TIME_STAMP': ['1970-01-01 00:00:00'],
'COOLANT_CORE_STATE': ['INACTIVE'],
'COOLANT_CORE_PRESSURE': (0.9, 1.1),
'COOLANT_CORE_MAX_PRESSURE': [1.5],
'COOLANT_CORE_VESSEL_TEMPERATURE': (18.0, 22.0),
'COOLANT_CORE_QUANTITY_IN_VESSEL': [0.0],
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': [0.0],
'COOLANT_CORE_FLOW_SPEED': [0.0],
'COOLANT_CORE_FLOW_ORDERED_SPEED': [0.0],
'COOLANT_CORE_FLOW_REACHED_SPEED': [False],
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': [3],
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': [2],
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': [PumpStatus.INACTIVE],
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': [0.0],
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': [0.0],
'RODS_STATUS': ['INACTIVE'],
'RODS_MOVEMENT_SPEED': [0.0],
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': [False],
'RODS_DEFORMED': [False],
'RODS_TEMPERATURE': (18.0, 22.0),
'RODS_MAX_TEMPERATURE': [1000.0],
'RODS_POS_ORDERED': [0.0],
'RODS_POS_ACTUAL': [0.0],
'RODS_POS_REACHED': [True],
'RODS_QUANTITY': [100],
'RODS_ALIGNED': [True],
'GENERATOR_0_KW': [0.0],
'GENERATOR_1_KW': [0.0],
'GENERATOR_2_KW': [0.0],
'GENERATOR_0_V': [0.0],
'GENERATOR_1_V': [0.0],
'GENERATOR_2_V': [0.0],
'GENERATOR_0_A': [0.0],
'GENERATOR_1_A': [0.0],
'GENERATOR_2_A': [0.0],
'GENERATOR_0_HERTZ': [0.0],
'GENERATOR_1_HERTZ': [0.0],
'GENERATOR_2_HERTZ': [0.0],
'GENERATOR_0_BREAKER': [BreakerStatus.OPEN],
'GENERATOR_1_BREAKER': [BreakerStatus.OPEN],
'GENERATOR_2_BREAKER': [BreakerStatus.OPEN],
'STEAM_TURBINE_0_RPM': [0.0],
'STEAM_TURBINE_1_RPM': [0.0],
'STEAM_TURBINE_2_RPM': [0.0],
'STEAM_TURBINE_0_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_1_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_2_TEMPERATURE': (18.0, 22.0),
'STEAM_TURBINE_0_PRESSURE': (0.9, 1.1),
'STEAM_TURBINE_1_PRESSURE': (0.9, 1.1),
'STEAM_TURBINE_2_PRESSURE': (0.9, 1.1),
}
NOMINAL = {
'CORE_TEMP': (290.0, 370.0),
'CORE_TEMP_OPERATIVE': [306.0],
'CORE_TEMP_MAX': [1000.0],
'CORE_TEMP_MIN': [0.0],
'CORE_TEMP_RESIDUAL': [False],
'CORE_PRESSURE': (14.5, 15.5),
'CORE_PRESSURE_MAX': [16.0],
'CORE_PRESSURE_OPERATIVE': [15.0],
'CORE_INTEGRITY': (99.0, 100.0),
'CORE_WEAR': (0.0, 1.0),
'CORE_STATE': ['ACTIVE'],
'CORE_STATE_CRITICALITY': (0.9, 1.1),
'CORE_CRITICAL_MASS_REACHED': [True],
'CORE_CRITICAL_MASS_REACHED_COUNTER': [1],
'CORE_IMMINENT_FUSION': [False],
'CORE_READY_FOR_START': [True],
'CORE_STEAM_PRESENT': [True],
'CORE_HIGH_STEAM_PRESENT': [False],
'TIME': ['00:00:00'],
'TIME_STAMP': ['1970-01-01 00:00:00'],
'COOLANT_CORE_STATE': ['ACTIVE'],
'COOLANT_CORE_PRESSURE': (13.5, 14.5),
'COOLANT_CORE_MAX_PRESSURE': [15.0],
'COOLANT_CORE_VESSEL_TEMPERATURE': (270.0, 290.0),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (95.0, 100.0),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (95.0, 100.0),
'COOLANT_CORE_FLOW_SPEED': (9.5, 10.5),
'COOLANT_CORE_FLOW_ORDERED_SPEED': [10.0],
'COOLANT_CORE_FLOW_REACHED_SPEED': [True],
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': [3],
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': [3],
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': [PumpStatus.ACTIVE_SPEED_REACHED],
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': [PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID],
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': [PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD],
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (95.0, 100.0),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (95.0, 100.0),
'RODS_STATUS': ['ACTIVE'],
'RODS_MOVEMENT_SPEED': (0.9, 1.1),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': [False],
'RODS_DEFORMED': [False],
'RODS_TEMPERATURE': (290.0, 310.0),
'RODS_MAX_TEMPERATURE': [1000.0],
'RODS_POS_ORDERED': [50.0],
'RODS_POS_ACTUAL': (49.5, 50.5),
'RODS_POS_REACHED': [True],
'RODS_QUANTITY': [100],
'RODS_ALIGNED': [True],
'GENERATOR_0_KW': (950.0, 1050.0),
'GENERATOR_1_KW': (950.0, 1050.0),
'GENERATOR_2_KW': (950.0, 1050.0),
'GENERATOR_0_V': (380.0, 420.0),
'GENERATOR_1_V': (380.0, 420.0),
'GENERATOR_2_V': (380.0, 420.0),
'GENERATOR_0_A': (2375.0, 2625.0),
'GENERATOR_1_A': (2375.0, 2625.0),
'GENERATOR_2_A': (2375.0, 2625.0),
'GENERATOR_0_HERTZ': (49.5, 50.5),
'GENERATOR_1_HERTZ': (49.5, 50.5),
'GENERATOR_2_HERTZ': (49.5, 50.5),
'GENERATOR_0_BREAKER': [BreakerStatus.CLOSED],
'GENERATOR_1_BREAKER': [BreakerStatus.CLOSED],
'GENERATOR_2_BREAKER': [BreakerStatus.CLOSED],
'STEAM_TURBINE_0_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_1_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_2_RPM': (2950.0, 3050.0),
'STEAM_TURBINE_0_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_1_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_2_TEMPERATURE': (270.0, 290.0),
'STEAM_TURBINE_0_PRESSURE': (13.5, 14.5),
'STEAM_TURBINE_1_PRESSURE': (13.5, 14.5),
'STEAM_TURBINE_2_PRESSURE': (13.5, 14.5),
}
class NuconSimulator:
class Parameters:
def __init__(self, nucon: Nucon):
for param_name in nucon.get_all_readable():
setattr(self, param_name, None)
def __init__(self, host: str = 'localhost', port: int = 8786):
self._nucon = Nucon()
self.parameters = self.Parameters(self._nucon)
self.time = 0.0
self.allow_all_writes = False
self.set_state(OperatingState.OFFLINE)
self.model = None
self.readable_params = list(self._nucon.get_all_readable().keys())
self.non_writable_params = [name for name, param in self._nucon.get_all_readable().items() if not param.is_writable]
self._run(host, port)
def get(self, parameter: Union[str, Any]) -> Any:
if isinstance(parameter, str):
return getattr(self.parameters, parameter)
return getattr(self.parameters, parameter.id)
def set(self, parameter: Union[str, Any], value: Any, force: bool = False) -> None:
if isinstance(parameter, str):
param_obj = self._nucon[parameter]
else:
param_obj = parameter
if not param_obj.is_writable and not force and not self.allow_all_writes:
raise ValueError(f"Parameter {param_obj.id} is not writable")
# Convert value to the correct type
try:
if param_obj.enum_type:
if isinstance(value, str):
value = param_obj.enum_type[value.upper()]
elif isinstance(value, int):
value = param_obj.enum_type(value)
elif not isinstance(value, param_obj.enum_type):
raise ValueError(f"Invalid enum value for {param_obj.id}")
else:
value = param_obj.param_type(value)
except (ValueError, KeyError):
raise ValueError(f"Invalid type for parameter {param_obj.id}. Expected {param_obj.param_type}")
# Check range if not forced
if not force and param_obj.min_val is not None and param_obj.max_val is not None:
if not param_obj.min_val <= value <= param_obj.max_val:
raise ValueError(f"Value {value} is out of range for parameter {param_obj.id}. "
f"Valid range: [{param_obj.min_val}, {param_obj.max_val}]")
setattr(self.parameters, param_obj.id, value)
def set_allow_all_writes(self, allow: bool) -> None:
self.allow_all_writes = allow
def update(self, time_step: float) -> None:
self._update_reactor_state(time_step)
self.time += time_step
def load_model(self, model_path: str) -> None:
try:
self.model = ReactorDynamicsModel(self.readable_params, self.non_writable_params)
self.model.load_state_dict(torch.load(model_path))
self.model.eval() # Set the model to evaluation mode
print(f"Model loaded successfully from {model_path}")
except Exception as e:
print(f"Error loading model: {str(e)}")
self.model = None
def _update_reactor_state(self, time_step: float) -> None:
if not self.model:
raise ValueError("Model not set. Please load a model using load_model() method.")
state = {}
for param in self.readable_params:
value = self.get(param)
if isinstance(value, Enum):
value = value.value
state[param] = value
# Use the model to predict the next state
with torch.no_grad():
next_state = self.model(state, time_step)
# Update the simulator's state
for param, value in next_state.items():
self.set(param, value)
def set_state(self, state: OperatingState) -> None:
self._sample_parameters_from_state(state)
def _sample_parameters_from_state(self, state: OperatingState) -> None:
for param_name, value_spec in state.value.items():
param = self._nucon.get_all_readable()[param_name]
if isinstance(value_spec, tuple):
value = random.uniform(*value_spec)
elif isinstance(value_spec, list):
value = random.choice(value_spec)
else:
raise ValueError(f"Invalid value specification for parameter {param_name}")
self.set(param, value, force=True)
def _run(self, port: int = 8786, host: str = 'localhost', debug: bool = False):
app = Flask(__name__)
@app.route('/', methods=['GET'])
def get_parameter():
variable = request.args.get('variable')
if not variable:
return jsonify({"error": "No variable specified"}), 400
try:
value = self.get(variable)
return str(value), 200
except KeyError:
return jsonify({"error": f"Unknown variable: {variable}"}), 404
@app.route('/', methods=['POST'])
def set_parameter():
variable = request.args.get('variable')
value = request.args.get('value')
if not variable or value is None:
return jsonify({"error": "Both variable and value must be specified"}), 400
try:
self.set(variable, value)
return "OK", 200
except ValueError as e:
return jsonify({"error": str(e)}), 400
except KeyError:
return jsonify({"error": f"Unknown variable: {variable}"}), 404
def run_simulator(host='localhost', port=8786, debug=False):
app.run(host=host, port=port, debug=debug)
threading.Thread(target=run_simulator, args=(host, port, debug), daemon=True).start()

View File

@ -1,41 +1,50 @@
import pytest import pytest
import warnings import warnings
from nucon import Nucon, NuconConfig, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus from nucon import Nucon, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
WARN_FLOAT_COULD_BE_INT = False WARN_FLOAT_COULD_BE_INT = False
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def nucon_setup(): def nucon_setup():
Nucon.set_dummy_mode(False) # Assume the game is running Nucon.set_dummy_mode(False)
Nucon.set_base_url("http://localhost:8785/") Nucon.set_base_url("http://localhost:8785/")
yield
Nucon.set_dummy_mode(True)
def test_read_all_parameters(nucon_setup): def test_read_all_parameters(nucon_setup):
all_params = Nucon.get_all() all_params = Nucon.get_all()
assert len(all_params) == len(Nucon) assert len(all_params) == len(Nucon)
for param, value in all_params.items(): for param, value in all_params.items():
assert isinstance(value, param.param_type), f"Parameter {param.name} has incorrect type. Expected {param.param_type}, got {type(value)}" assert isinstance(value, param.param_type), f"Parameter {param.id} has incorrect type. Expected {param.param_type}, got {type(value)}"
if param.param_type == float and value.is_integer() and WARN_FLOAT_COULD_BE_INT: if param.param_type == float and value.is_integer() and WARN_FLOAT_COULD_BE_INT:
warnings.warn(f"Parameter {param.name} is a float but has an integer value: {value}") warnings.warn(f"Parameter {param.id} is a float but has an integer value: {value}")
if param.param_type == str:
try:
float(value)
raise ValueError(f"Parameter {param.id} is a string that looks like a number: {value}")
except ValueError:
pass
try:
bool(value.lower())
raise ValueError(f"Parameter {param.id} is a string that looks like a boolean: {value}")
except ValueError:
pass
def test_write_writable_parameters(nucon_setup): def test_write_writable_parameters(nucon_setup):
writable_params = Nucon.get_all_writable() writable_params = Nucon.get_all_writable()
for param in writable_params: for param in writable_params:
current_value = param.value current_value = param.value
param.value = current_value param.value = current_value
assert param.value == current_value, f"Failed to write to parameter {param.name}" assert param.value == current_value, f"Failed to write to parameter {param.id}"
def test_non_writable_parameters(nucon_setup): def test_non_writable_parameters(nucon_setup):
non_writable_params = [param for param in Nucon if not param.is_writable] non_writable_params = [param for param in Nucon if not param.is_writable]
for param in non_writable_params: for param in non_writable_params:
# Test that normal set raises an error # Test that normal set raises an error
with pytest.raises(ValueError, match=f"Parameter {param.name} is not writable"): with pytest.raises(ValueError, match=f"Parameter {param.id} is not writable"):
param.value = param.value # Attempt to write the current value param.value = param.value # Attempt to write the current value
# Test that force_set is refused by the webserver # Test that force_set is refused by the webserver
current_value = param.value current_value = param.value
with pytest.raises(Exception, match=f"Failed to set parameter {param.name}"): with pytest.raises(Exception, match=f"Failed to set parameter {param.id}"):
Nucon.set(param, current_value, force=True) Nucon.set(param, current_value, force=True)
def test_enum_parameters(nucon_setup): def test_enum_parameters(nucon_setup):

114
test/test_sim.py Normal file
View File

@ -0,0 +1,114 @@
import pytest
import time
from nucon import Nucon, PumpStatus, PumpDryStatus, PumpOverloadStatus, BreakerStatus
from nucon.sim import NuconSimulator, OperatingState
@pytest.fixture(scope="module")
def simulator_setup():
simulator = NuconSimulator(port=8786)
simulator.run()
time.sleep(1) # Give the simulator time to start
nucon = Nucon(port=8786)
return simulator, nucon
def test_simulator_initialization(simulator_setup):
simulator, nucon = simulator_setup
assert simulator is not None
assert nucon is not None
def test_read_all_parameters(simulator_setup):
_, nucon = simulator_setup
all_params = nucon.get_all()
assert len(all_params) == len(nucon)
for param_id, value in all_params.items():
param = nucon[param_id]
assert isinstance(value, param.param_type), f"Parameter {param.id} has incorrect type. Expected {param.param_type}, got {type(value)}"
def test_write_writable_parameters(simulator_setup):
_, nucon = simulator_setup
writable_params = nucon.get_all_writable()
for param in writable_params.values():
current_value = param.value
if param.param_type == float:
new_value = current_value + 1.0
elif param.param_type == int:
new_value = current_value + 1
elif param.param_type == bool:
new_value = not current_value
elif issubclass(param.param_type, Enum):
new_value = list(param.param_type)[0]
else:
continue # Skip if we can't determine a new value
param.value = new_value
assert param.value == new_value, f"Failed to write to parameter {param.id}"
def test_non_writable_parameters(simulator_setup):
_, nucon = simulator_setup
non_writable_params = [param for param in nucon.get_all_readable().values() if not param.is_writable]
for param in non_writable_params:
with pytest.raises(ValueError, match=f"Parameter {param.id} is not writable"):
param.value = param.value # Attempt to write the current value
def test_enum_parameters(simulator_setup):
_, nucon = simulator_setup
pump_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
assert isinstance(pump_status, PumpStatus)
dry_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value
assert isinstance(dry_status, PumpDryStatus)
overload_status = nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value
assert isinstance(overload_status, PumpOverloadStatus)
breaker_status = nucon.GENERATOR_0_BREAKER.value
assert isinstance(breaker_status, BreakerStatus)
def test_custom_truthy_values(simulator_setup):
_, nucon = simulator_setup
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value in [PumpStatus.ACTIVE_NO_SPEED_REACHED, PumpStatus.ACTIVE_SPEED_REACHED])
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS.value == PumpDryStatus.INACTIVE_OR_ACTIVE_WITH_FLUID)
assert bool(nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value) == (nucon.COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS.value == PumpOverloadStatus.INACTIVE_OR_ACTIVE_NO_OVERLOAD)
assert bool(nucon.GENERATOR_0_BREAKER.value) == (nucon.GENERATOR_0_BREAKER.value == BreakerStatus.OPEN)
def test_get_multiple_parameters(simulator_setup):
_, nucon = simulator_setup
params_to_get = [nucon.CORE_TEMP, nucon.CORE_PRESSURE, nucon.RODS_POS_ACTUAL]
multiple_params = nucon.get_multiple(params_to_get)
assert len(multiple_params) == len(params_to_get)
for param_id, value in multiple_params.items():
assert isinstance(value, nucon[param_id].param_type)
def test_simulator_update(simulator_setup):
simulator, nucon = simulator_setup
initial_temp = nucon.CORE_TEMP.value
simulator.update(10) # Update for 10 seconds
new_temp = nucon.CORE_TEMP.value
assert new_temp != initial_temp, "Core temperature should change after update"
def test_set_operating_state(simulator_setup):
simulator, nucon = simulator_setup
simulator._set_state(OperatingState.NOMINAL)
time.sleep(1) # Give the simulator time to update
assert nucon.CORE_STATE.value == 'NOMINAL'
assert 290 <= nucon.CORE_TEMP.value <= 370
def test_allow_all_writes(simulator_setup):
simulator, nucon = simulator_setup
simulator.set_allow_all_writes(True)
non_writable_param = next(param for param in nucon.get_all_readable().values() if not param.is_writable)
current_value = non_writable_param.value
new_value = current_value + 1 if isinstance(current_value, (int, float)) else not current_value
non_writable_param.value = new_value
assert non_writable_param.value == new_value, f"Failed to write to non-writable parameter {non_writable_param.id} when allow_all_writes is True"
def test_simulator_consistency(simulator_setup):
simulator, nucon = simulator_setup
for _ in range(10):
simulator.update(1)
all_params = nucon.get_all()
for param_id, value in all_params.items():
assert value == getattr(simulator.parameters, param_id), f"Inconsistency found for parameter {param_id}"
if __name__ == "__main__":
pytest.main()