Bug fixes and additions

This commit is contained in:
Dominik Moritz Roth 2024-10-02 22:36:06 +02:00
parent 7da36e8a14
commit 0d99378f7d

View File

@ -1,12 +1,31 @@
from enum import Enum, IntEnum from enum import Enum, IntEnum
import requests import requests
from typing import Union, Dict, Type, List from typing import Union, Dict, Type, List, Optional, Any
class NuconConfig: class NuconConfig:
base_url = "http://localhost:8080/" base_url = "http://localhost:8785/"
dummy_mode = False dummy_mode = False
class PumpStatus(IntEnum): class ParameterEnum(Enum):
@classmethod
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
if isinstance(value, str):
# Handle boolean-like strings
if value.lower() in ('true'):
return cls(True)
elif value.lower() in ('false'):
return cls(False)
# Try to convert to int for int-based enums
try:
return cls(int(value))
except ValueError:
pass
# If we can't handle the value, let the default Enum behavior take over
return None
class PumpStatus(ParameterEnum):
INACTIVE = 0 INACTIVE = 0
ACTIVE_NO_SPEED_REACHED = 1 ACTIVE_NO_SPEED_REACHED = 1
ACTIVE_SPEED_REACHED = 2 ACTIVE_SPEED_REACHED = 2
@ -17,39 +36,57 @@ class PumpStatus(IntEnum):
def __bool__(self): def __bool__(self):
return self.value in (1, 2) return self.value in (1, 2)
class PumpDryStatus(IntEnum): class PumpDryStatus(ParameterEnum):
ACTIVE_WITHOUT_FLUID = 1 ACTIVE_WITHOUT_FLUID = 1
INACTIVE_OR_ACTIVE_WITH_FLUID = 4 INACTIVE_OR_ACTIVE_WITH_FLUID = 4
def __bool__(self): def __bool__(self):
return self.value == 4 return self.value == 4
class PumpOverloadStatus(IntEnum): class PumpOverloadStatus(ParameterEnum):
ACTIVE_AND_OVERLOAD = 1 ACTIVE_AND_OVERLOAD = 1
INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4 INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4
def __bool__(self): def __bool__(self):
return self.value == 4 return self.value == 4
class BreakerStatus(Enum): class BreakerStatus(ParameterEnum):
OPEN = True OPEN = True
CLOSED = False CLOSED = False
def __bool__(self): def __bool__(self):
return self.value return self.value
CoreState = str
CoolantCoreState = str
RodsState = str
#class CoreState(ParameterEnum):
# REACTIVE = 'REACTIVO'
#
# def __bool__(self):
# return self.value == 'REACTIVO'
#class CoolantCoreState(ParameterEnum):
# CIRCULATING = 'CIRCULANDO'
#
# def __bool__(self):
# return self.value == 'CIRCULANDO'
class Nucon(Enum): class Nucon(Enum):
# Core
CORE_TEMP = ("CORE_TEMP", float, False) CORE_TEMP = ("CORE_TEMP", float, False)
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False) CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False) CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False) CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", float, False) CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False)
CORE_PRESSURE = ("CORE_PRESSURE", float, False) CORE_PRESSURE = ("CORE_PRESSURE", float, False)
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False) CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False) CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False) CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
CORE_WEAR = ("CORE_WEAR", float, False) CORE_WEAR = ("CORE_WEAR", float, False)
CORE_STATE = ("CORE_STATE", int, False) CORE_STATE = ("CORE_STATE", CoreState, False)
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False) CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False) CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False) CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
@ -58,41 +95,42 @@ class Nucon(Enum):
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False) CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False) CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
TIME = ("TIME", float, False) # Time
TIME = ("TIME", str, False)
TIME_STAMP = ("TIME_STAMP", str, False) TIME_STAMP = ("TIME_STAMP", str, False)
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", int, False) # Coolant Core
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False)
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False) COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False) COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False) COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False) COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False) COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False) COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, True) COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False)
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", float, False) COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False)
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False) COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False) COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
# Circulation Pumps
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False) COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False) COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False) COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False) COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False) COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False) COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False) COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False) COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False) COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, True) COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, True) COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, True)
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False) COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False) COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False) COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
RODS_STATUS = ("RODS_STATUS", int, False) # Rods
RODS_STATUS = ("RODS_STATUS", RodsState, False)
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False) RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False) RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
RODS_DEFORMED = ("RODS_DEFORMED", bool, False) RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
@ -104,6 +142,7 @@ class Nucon(Enum):
RODS_QUANTITY = ("RODS_QUANTITY", int, False) RODS_QUANTITY = ("RODS_QUANTITY", int, False)
RODS_ALIGNED = ("RODS_ALIGNED", bool, False) RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
# Generators
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False) GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False) GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False) GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
@ -116,10 +155,11 @@ class Nucon(Enum):
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False) GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False) GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False) GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, True) GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False)
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, True) GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False)
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, True) GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False)
# Steam Turbines
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False) STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False) STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False) STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
@ -137,6 +177,13 @@ class Nucon(Enum):
self.min_val = min_val self.min_val = min_val
self.max_val = max_val self.max_val = max_val
def __repr__(self) -> str:
type_repr = repr(self.param_type)
return (f"<{self.__class__.__name__}.{self.name}: "
f"{{'value': {repr(self.value)}, 'type': {type_repr}, "
f"'writable': {self.is_writable}}}>"
)
def __str__(self): def __str__(self):
return self.id return self.id
@ -186,41 +233,58 @@ class Nucon(Enum):
return [param for param in cls if param.is_writable] return [param for param in cls if param.is_writable]
@classmethod @classmethod
def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, Enum]: def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]:
if isinstance(parameter, str): if isinstance(parameter, str):
parameter = cls[parameter] parameter = cls[parameter]
if NuconConfig.dummy_mode: if NuconConfig.dummy_mode:
return cls._get_dummy_value(parameter) return cls._get_dummy_value(parameter)
response = requests.get(NuconConfig.base_url, params={"variable": parameter.name}) value = cls._query(parameter.name)
if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter.name}. Status code: {response.status_code}")
value = response.text.strip() if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum):
return parameter.enum_type(value)
if parameter.enum_type: elif parameter.param_type == float:
return parameter.enum_type(int(value)) return float(value)
elif parameter.param_type in (float, int): elif parameter.param_type == int:
return parameter.param_type(value) return int(value)
elif parameter.param_type == bool: elif parameter.param_type == bool:
return value.lower() == "true" if value.lower() in ('true'):
return True
elif value.lower() in ('false'):
return False
else:
raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).")
else: else:
return value return value
@classmethod
def _query(cls, parameter_name: str) -> str:
response = requests.get(NuconConfig.base_url, params={"variable": parameter_name})
if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}")
return response.text.strip()
@classmethod @classmethod
def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]: def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
if parameter.enum_type: if parameter.enum_type:
return next(iter(parameter.enum_type)) return next(iter(parameter.enum_type))
elif parameter.param_type == float: elif parameter.param_type == float:
return 0.0 if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else:
return 3.14
elif parameter.param_type == int: elif parameter.param_type == int:
return 0 if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else:
return 42
elif parameter.param_type == bool: elif parameter.param_type == bool:
return False return False
else: else:
return "" return "dummy"
@classmethod @classmethod
def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]: def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]: