From 0d99378f7de38d431e677b4def04ff42b951df4f Mon Sep 17 00:00:00 2001 From: Dominik Roth Date: Wed, 2 Oct 2024 22:36:06 +0200 Subject: [PATCH] Bug fixes and additions --- nucon/core.py | 138 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 101 insertions(+), 37 deletions(-) diff --git a/nucon/core.py b/nucon/core.py index 426855d..aee5adc 100644 --- a/nucon/core.py +++ b/nucon/core.py @@ -1,12 +1,31 @@ from enum import Enum, IntEnum import requests -from typing import Union, Dict, Type, List +from typing import Union, Dict, Type, List, Optional, Any class NuconConfig: - base_url = "http://localhost:8080/" + base_url = "http://localhost:8785/" dummy_mode = False -class PumpStatus(IntEnum): +class ParameterEnum(Enum): + @classmethod + def _missing_(cls, value: Any) -> Union['ParameterEnum', None]: + if isinstance(value, str): + # Handle boolean-like strings + if value.lower() in ('true'): + return cls(True) + elif value.lower() in ('false'): + return cls(False) + + # Try to convert to int for int-based enums + try: + return cls(int(value)) + except ValueError: + pass + + # If we can't handle the value, let the default Enum behavior take over + return None + +class PumpStatus(ParameterEnum): INACTIVE = 0 ACTIVE_NO_SPEED_REACHED = 1 ACTIVE_SPEED_REACHED = 2 @@ -17,39 +36,57 @@ class PumpStatus(IntEnum): def __bool__(self): return self.value in (1, 2) -class PumpDryStatus(IntEnum): +class PumpDryStatus(ParameterEnum): ACTIVE_WITHOUT_FLUID = 1 INACTIVE_OR_ACTIVE_WITH_FLUID = 4 def __bool__(self): return self.value == 4 -class PumpOverloadStatus(IntEnum): +class PumpOverloadStatus(ParameterEnum): ACTIVE_AND_OVERLOAD = 1 INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4 def __bool__(self): return self.value == 4 -class BreakerStatus(Enum): +class BreakerStatus(ParameterEnum): OPEN = True CLOSED = False def __bool__(self): return self.value +CoreState = str +CoolantCoreState = str +RodsState = str + +#class CoreState(ParameterEnum): +# REACTIVE = 'REACTIVO' +# +# def __bool__(self): +# return self.value == 'REACTIVO' + +#class CoolantCoreState(ParameterEnum): +# CIRCULATING = 'CIRCULANDO' +# +# def __bool__(self): +# return self.value == 'CIRCULANDO' + + class Nucon(Enum): + # Core CORE_TEMP = ("CORE_TEMP", float, False) CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False) CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False) CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False) - CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", float, False) + CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False) CORE_PRESSURE = ("CORE_PRESSURE", float, False) CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False) CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False) CORE_INTEGRITY = ("CORE_INTEGRITY", float, False) CORE_WEAR = ("CORE_WEAR", float, False) - CORE_STATE = ("CORE_STATE", int, False) + CORE_STATE = ("CORE_STATE", CoreState, False) CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False) CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False) CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False) @@ -58,41 +95,42 @@ class Nucon(Enum): CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False) CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False) - TIME = ("TIME", float, False) + # Time + TIME = ("TIME", str, False) TIME_STAMP = ("TIME_STAMP", str, False) - COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", int, False) + # Coolant Core + COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False) COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False) COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False) COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False) COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False) COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False) COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False) - COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, True) - COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", float, False) + COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False) + COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False) COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False) COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False) + # Circulation Pumps COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False) COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False) COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False) - COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False) COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False) COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False) - COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False) COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False) COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False) - - COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, True) - COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, True) - COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, True) + COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False) + COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False) + COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False) COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False) COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False) COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False) - RODS_STATUS = ("RODS_STATUS", int, False) + # Rods + RODS_STATUS = ("RODS_STATUS", RodsState, False) RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False) RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False) RODS_DEFORMED = ("RODS_DEFORMED", bool, False) @@ -104,6 +142,7 @@ class Nucon(Enum): RODS_QUANTITY = ("RODS_QUANTITY", int, False) RODS_ALIGNED = ("RODS_ALIGNED", bool, False) + # Generators GENERATOR_0_KW = ("GENERATOR_0_KW", float, False) GENERATOR_1_KW = ("GENERATOR_1_KW", float, False) GENERATOR_2_KW = ("GENERATOR_2_KW", float, False) @@ -116,10 +155,11 @@ class Nucon(Enum): GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False) GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False) GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False) - GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, True) - GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, True) - GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, True) + GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False) + GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False) + GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False) + # Steam Turbines STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False) STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False) STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False) @@ -137,6 +177,13 @@ class Nucon(Enum): self.min_val = min_val self.max_val = max_val + def __repr__(self) -> str: + type_repr = repr(self.param_type) + return (f"<{self.__class__.__name__}.{self.name}: " + f"{{'value': {repr(self.value)}, 'type': {type_repr}, " + f"'writable': {self.is_writable}}}>" + ) + def __str__(self): return self.id @@ -186,41 +233,58 @@ class Nucon(Enum): return [param for param in cls if param.is_writable] @classmethod - def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, Enum]: + def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]: if isinstance(parameter, str): parameter = cls[parameter] if NuconConfig.dummy_mode: return cls._get_dummy_value(parameter) - response = requests.get(NuconConfig.base_url, params={"variable": parameter.name}) - - if response.status_code != 200: - raise Exception(f"Failed to query parameter {parameter.name}. Status code: {response.status_code}") + value = cls._query(parameter.name) - value = response.text.strip() - - if parameter.enum_type: - return parameter.enum_type(int(value)) - elif parameter.param_type in (float, int): - return parameter.param_type(value) + if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum): + return parameter.enum_type(value) + elif parameter.param_type == float: + return float(value) + elif parameter.param_type == int: + return int(value) elif parameter.param_type == bool: - return value.lower() == "true" + if value.lower() in ('true'): + return True + elif value.lower() in ('false'): + return False + else: + raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).") else: return value + @classmethod + def _query(cls, parameter_name: str) -> str: + response = requests.get(NuconConfig.base_url, params={"variable": parameter_name}) + + if response.status_code != 200: + raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}") + + return response.text.strip() + @classmethod def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]: if parameter.enum_type: return next(iter(parameter.enum_type)) elif parameter.param_type == float: - return 0.0 + if parameter.max_val is not None and parameter.min_val is not None: + return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val + else: + return 3.14 elif parameter.param_type == int: - return 0 + if parameter.max_val is not None and parameter.min_val is not None: + return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val + else: + return 42 elif parameter.param_type == bool: return False else: - return "" + return "dummy" @classmethod def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]: