NuCon/nucon/core.py

255 lines
11 KiB
Python
Raw Normal View History

2024-10-02 16:25:45 +02:00
from enum import Enum, IntEnum
import requests
from typing import Union, Dict, Type, List
class NuconConfig:
base_url = "http://localhost:8080/"
dummy_mode = False
class PumpStatus(IntEnum):
INACTIVE = 0
ACTIVE_NO_SPEED_REACHED = 1
ACTIVE_SPEED_REACHED = 2
REQUIRES_MAINTENANCE = 3
NOT_INSTALLED = 4
INSUFFICIENT_ENERGY = 5
def __bool__(self):
return self.value in (1, 2)
class PumpDryStatus(IntEnum):
ACTIVE_WITHOUT_FLUID = 1
INACTIVE_OR_ACTIVE_WITH_FLUID = 4
def __bool__(self):
return self.value == 4
class PumpOverloadStatus(IntEnum):
ACTIVE_AND_OVERLOAD = 1
INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4
def __bool__(self):
return self.value == 4
class BreakerStatus(Enum):
OPEN = True
CLOSED = False
def __bool__(self):
return self.value
class Nucon(Enum):
CORE_TEMP = ("CORE_TEMP", float, False)
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", float, False)
CORE_PRESSURE = ("CORE_PRESSURE", float, False)
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
CORE_WEAR = ("CORE_WEAR", float, False)
CORE_STATE = ("CORE_STATE", int, False)
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
CORE_IMMINENT_FUSION = ("CORE_IMMINENT_FUSION", bool, False)
CORE_READY_FOR_START = ("CORE_READY_FOR_START", bool, False)
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
TIME = ("TIME", float, False)
TIME_STAMP = ("TIME_STAMP", str, False)
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", int, False)
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, True)
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", float, False)
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, True)
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, True)
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, True)
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
RODS_STATUS = ("RODS_STATUS", int, False)
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
RODS_TEMPERATURE = ("RODS_TEMPERATURE", float, False)
RODS_MAX_TEMPERATURE = ("RODS_MAX_TEMPERATURE", float, False)
RODS_POS_ORDERED = ("RODS_POS_ORDERED", float, True)
RODS_POS_ACTUAL = ("RODS_POS_ACTUAL", float, False)
RODS_POS_REACHED = ("RODS_POS_REACHED", bool, False)
RODS_QUANTITY = ("RODS_QUANTITY", int, False)
RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
GENERATOR_0_V = ("GENERATOR_0_V", float, False)
GENERATOR_1_V = ("GENERATOR_1_V", float, False)
GENERATOR_2_V = ("GENERATOR_2_V", float, False)
GENERATOR_0_A = ("GENERATOR_0_A", float, False)
GENERATOR_1_A = ("GENERATOR_1_A", float, False)
GENERATOR_2_A = ("GENERATOR_2_A", float, False)
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, True)
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, True)
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, True)
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
STEAM_TURBINE_0_TEMPERATURE = ("STEAM_TURBINE_0_TEMPERATURE", float, False)
STEAM_TURBINE_1_TEMPERATURE = ("STEAM_TURBINE_1_TEMPERATURE", float, False)
STEAM_TURBINE_2_TEMPERATURE = ("STEAM_TURBINE_2_TEMPERATURE", float, False)
STEAM_TURBINE_0_PRESSURE = ("STEAM_TURBINE_0_PRESSURE", float, False)
STEAM_TURBINE_1_PRESSURE = ("STEAM_TURBINE_1_PRESSURE", float, False)
STEAM_TURBINE_2_PRESSURE = ("STEAM_TURBINE_2_PRESSURE", float, False)
def __init__(self, id: str, param_type: Type, is_writable: bool):
self.id = id
self.param_type = param_type
self.is_writable = is_writable
@property
def enum_type(self) -> Type[Enum]:
return self.param_type if issubclass(self.param_type, Enum) else None
@property
def value(self) -> Union[float, int, bool, str, Enum]:
2024-10-02 16:51:12 +02:00
return self.read()
2024-10-02 16:25:45 +02:00
def read(self) -> Union[float, int, bool, str, Enum]:
2024-10-02 16:51:12 +02:00
return Nucon.get(self)
2024-10-02 16:25:45 +02:00
@value.setter
def value(self, new_value: Union[float, int, bool, str, Enum]) -> None:
2024-10-02 16:51:12 +02:00
self.write(new_value)
2024-10-02 16:25:45 +02:00
2024-10-02 16:51:12 +02:00
def write(self, new_value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
Nucon.set(self, new_value, force)
2024-10-02 16:25:45 +02:00
@classmethod
def set_base_url(cls, url: str) -> None:
NuconConfig.base_url = url
@classmethod
def set_dummy_mode(cls, dummy_mode: bool) -> None:
NuconConfig.dummy_mode = dummy_mode
@classmethod
def get_all_readable(cls) -> List['Nucon']:
return list(cls) # All parameters are readable
@classmethod
def get_all_writable(cls) -> List['Nucon']:
return [param for param in cls if param.is_writable]
@classmethod
def get(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
if NuconConfig.dummy_mode:
return cls._get_dummy_value(parameter)
response = requests.get(NuconConfig.base_url, params={"variable": parameter.name})
if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter.name}. Status code: {response.status_code}")
value = response.text.strip()
if parameter.enum_type:
return parameter.enum_type(int(value))
elif parameter.param_type in (float, int):
return parameter.param_type(value)
elif parameter.param_type == bool:
return value.lower() == "true"
else:
return value
@classmethod
def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
if parameter.enum_type:
return next(iter(parameter.enum_type))
elif parameter.param_type == float:
return 0.0
elif parameter.param_type == int:
return 0
elif parameter.param_type == bool:
return False
else:
return ""
@classmethod
def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return {param: cls.get(param) for param in parameters}
@classmethod
def get_all(cls) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return cls.get_multiple(list(cls))
@classmethod
2024-10-02 16:51:12 +02:00
def set(cls, parameter: 'Nucon', value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
if not force and not parameter.is_writable:
2024-10-02 16:25:45 +02:00
raise ValueError(f"Parameter {parameter.name} is not writable")
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
if NuconConfig.dummy_mode:
2024-10-02 16:51:12 +02:00
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter.name} to {value}")
2024-10-02 16:25:45 +02:00
return
2024-10-02 16:51:12 +02:00
response = requests.post(NuconConfig.base_url, params={"variable": parameter.name, "value": str(value), "force": str(force).lower()})
2024-10-02 16:25:45 +02:00
if response.status_code != 200:
2024-10-02 16:51:12 +02:00
raise E
2024-10-02 16:25:45 +02:00
# Example usage
if __name__ == "__main__":
# Enable dummy mode for testing
Nucon.set_dummy_mode(True)
# Get a single parameter
core_temp = Nucon.CORE_TEMP.value
print(f"Core Temperature: {core_temp}")
# Get a parameter with an enum
pump_status = Nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
print(f"Pump 0 Status: {pump_status}")
# Set a parameter with an enum
try:
Nucon.GENERATOR_0_BREAKER.value = BreakerStatus.OPEN
print(f"Successfully set GENERATOR_0_BREAKER to {Nucon.GENERATOR_0_BREAKER.value}")
except ValueError as e:
print(f"Error: {e}")
# Get all parameters
all_params = Nucon.get_all()
print("All parameters:")
for param, value in all_params.items():
print(f"{param.name}: {value}")