NuCon/nucon/core.py

346 lines
14 KiB
Python
Raw Normal View History

2024-10-02 16:25:45 +02:00
from enum import Enum, IntEnum
import requests
2024-10-02 22:36:06 +02:00
from typing import Union, Dict, Type, List, Optional, Any
2024-10-02 16:25:45 +02:00
class NuconConfig:
2024-10-02 22:36:06 +02:00
base_url = "http://localhost:8785/"
2024-10-02 16:25:45 +02:00
dummy_mode = False
2024-10-02 22:36:06 +02:00
class ParameterEnum(Enum):
@classmethod
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
if isinstance(value, str):
# Handle boolean-like strings
if value.lower() in ('true'):
return cls(True)
elif value.lower() in ('false'):
return cls(False)
# Try to convert to int for int-based enums
try:
return cls(int(value))
except ValueError:
pass
# If we can't handle the value, let the default Enum behavior take over
return None
class PumpStatus(ParameterEnum):
2024-10-02 16:25:45 +02:00
INACTIVE = 0
ACTIVE_NO_SPEED_REACHED = 1
ACTIVE_SPEED_REACHED = 2
REQUIRES_MAINTENANCE = 3
NOT_INSTALLED = 4
INSUFFICIENT_ENERGY = 5
def __bool__(self):
return self.value in (1, 2)
2024-10-02 22:36:06 +02:00
class PumpDryStatus(ParameterEnum):
2024-10-02 16:25:45 +02:00
ACTIVE_WITHOUT_FLUID = 1
INACTIVE_OR_ACTIVE_WITH_FLUID = 4
def __bool__(self):
return self.value == 4
2024-10-02 22:36:06 +02:00
class PumpOverloadStatus(ParameterEnum):
2024-10-02 16:25:45 +02:00
ACTIVE_AND_OVERLOAD = 1
INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4
def __bool__(self):
return self.value == 4
2024-10-02 22:36:06 +02:00
class BreakerStatus(ParameterEnum):
2024-10-02 16:25:45 +02:00
OPEN = True
CLOSED = False
def __bool__(self):
return self.value
2024-10-02 22:36:06 +02:00
CoreState = str
CoolantCoreState = str
RodsState = str
#class CoreState(ParameterEnum):
# REACTIVE = 'REACTIVO'
#
# def __bool__(self):
# return self.value == 'REACTIVO'
#class CoolantCoreState(ParameterEnum):
# CIRCULATING = 'CIRCULANDO'
#
# def __bool__(self):
# return self.value == 'CIRCULANDO'
2024-10-02 16:25:45 +02:00
class Nucon(Enum):
2024-10-02 22:36:06 +02:00
# Core
2024-10-02 16:25:45 +02:00
CORE_TEMP = ("CORE_TEMP", float, False)
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
2024-10-02 22:36:06 +02:00
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False)
2024-10-02 16:25:45 +02:00
CORE_PRESSURE = ("CORE_PRESSURE", float, False)
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
CORE_WEAR = ("CORE_WEAR", float, False)
2024-10-02 22:36:06 +02:00
CORE_STATE = ("CORE_STATE", CoreState, False)
2024-10-02 16:25:45 +02:00
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
CORE_IMMINENT_FUSION = ("CORE_IMMINENT_FUSION", bool, False)
CORE_READY_FOR_START = ("CORE_READY_FOR_START", bool, False)
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
2024-10-02 22:36:06 +02:00
# Time
TIME = ("TIME", str, False)
2024-10-02 16:25:45 +02:00
TIME_STAMP = ("TIME_STAMP", str, False)
2024-10-02 22:36:06 +02:00
# Coolant Core
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False)
2024-10-02 16:25:45 +02:00
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
2024-10-02 22:36:06 +02:00
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False)
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False)
2024-10-02 16:25:45 +02:00
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
2024-10-02 22:36:06 +02:00
# Circulation Pumps
2024-10-02 16:25:45 +02:00
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
2024-10-02 22:36:06 +02:00
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False)
2024-10-02 16:25:45 +02:00
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
2024-10-02 22:36:06 +02:00
# Rods
RODS_STATUS = ("RODS_STATUS", RodsState, False)
2024-10-02 16:25:45 +02:00
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
RODS_TEMPERATURE = ("RODS_TEMPERATURE", float, False)
RODS_MAX_TEMPERATURE = ("RODS_MAX_TEMPERATURE", float, False)
RODS_POS_ORDERED = ("RODS_POS_ORDERED", float, True)
RODS_POS_ACTUAL = ("RODS_POS_ACTUAL", float, False)
RODS_POS_REACHED = ("RODS_POS_REACHED", bool, False)
RODS_QUANTITY = ("RODS_QUANTITY", int, False)
RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
2024-10-02 22:36:06 +02:00
# Generators
2024-10-02 16:25:45 +02:00
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
GENERATOR_0_V = ("GENERATOR_0_V", float, False)
GENERATOR_1_V = ("GENERATOR_1_V", float, False)
GENERATOR_2_V = ("GENERATOR_2_V", float, False)
GENERATOR_0_A = ("GENERATOR_0_A", float, False)
GENERATOR_1_A = ("GENERATOR_1_A", float, False)
GENERATOR_2_A = ("GENERATOR_2_A", float, False)
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
2024-10-02 22:36:06 +02:00
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False)
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False)
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False)
2024-10-02 16:25:45 +02:00
2024-10-02 22:36:06 +02:00
# Steam Turbines
2024-10-02 16:25:45 +02:00
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
STEAM_TURBINE_0_TEMPERATURE = ("STEAM_TURBINE_0_TEMPERATURE", float, False)
STEAM_TURBINE_1_TEMPERATURE = ("STEAM_TURBINE_1_TEMPERATURE", float, False)
STEAM_TURBINE_2_TEMPERATURE = ("STEAM_TURBINE_2_TEMPERATURE", float, False)
STEAM_TURBINE_0_PRESSURE = ("STEAM_TURBINE_0_PRESSURE", float, False)
STEAM_TURBINE_1_PRESSURE = ("STEAM_TURBINE_1_PRESSURE", float, False)
STEAM_TURBINE_2_PRESSURE = ("STEAM_TURBINE_2_PRESSURE", float, False)
2024-10-02 18:43:18 +02:00
def __init__(self, id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
2024-10-02 16:25:45 +02:00
self.id = id
self.param_type = param_type
self.is_writable = is_writable
2024-10-02 18:43:18 +02:00
self.min_val = min_val
self.max_val = max_val
2024-10-02 16:25:45 +02:00
2024-10-02 22:36:06 +02:00
def __repr__(self) -> str:
type_repr = repr(self.param_type)
return (f"<{self.__class__.__name__}.{self.name}: "
f"{{'value': {repr(self.value)}, 'type': {type_repr}, "
f"'writable': {self.is_writable}}}>"
)
2024-10-02 17:05:20 +02:00
def __str__(self):
return self.id
2024-10-02 16:25:45 +02:00
@property
def enum_type(self) -> Type[Enum]:
return self.param_type if issubclass(self.param_type, Enum) else None
@property
def value(self) -> Union[float, int, bool, str, Enum]:
2024-10-02 16:51:12 +02:00
return self.read()
2024-10-02 16:25:45 +02:00
def read(self) -> Union[float, int, bool, str, Enum]:
2024-10-02 16:51:12 +02:00
return Nucon.get(self)
2024-10-02 16:25:45 +02:00
@value.setter
def value(self, new_value: Union[float, int, bool, str, Enum]) -> None:
2024-10-02 16:51:12 +02:00
self.write(new_value)
2024-10-02 16:25:45 +02:00
2024-10-02 16:51:12 +02:00
def write(self, new_value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
Nucon.set(self, new_value, force)
2024-10-02 16:25:45 +02:00
2024-10-02 18:43:18 +02:00
def check_in_range(self, value: Union[int, float], raise_on_oob=False) -> None:
if self.min_val is not None and value < self.min_val:
if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val} for {self.name}")
return False
if self.max_val is not None and value > self.max_val:
if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val} for {self.name}")
return False
return True
2024-10-02 16:25:45 +02:00
@classmethod
def set_base_url(cls, url: str) -> None:
NuconConfig.base_url = url
@classmethod
def set_dummy_mode(cls, dummy_mode: bool) -> None:
NuconConfig.dummy_mode = dummy_mode
@classmethod
def get_all_readable(cls) -> List['Nucon']:
return list(cls) # All parameters are readable
@classmethod
def get_all_writable(cls) -> List['Nucon']:
return [param for param in cls if param.is_writable]
@classmethod
2024-10-02 22:36:06 +02:00
def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]:
2024-10-02 17:05:20 +02:00
if isinstance(parameter, str):
parameter = cls[parameter]
2024-10-02 16:25:45 +02:00
if NuconConfig.dummy_mode:
return cls._get_dummy_value(parameter)
2024-10-02 22:36:06 +02:00
value = cls._query(parameter.name)
2024-10-02 16:25:45 +02:00
2024-10-02 22:36:06 +02:00
if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum):
return parameter.enum_type(value)
elif parameter.param_type == float:
return float(value)
elif parameter.param_type == int:
return int(value)
2024-10-02 16:25:45 +02:00
elif parameter.param_type == bool:
2024-10-02 22:36:06 +02:00
if value.lower() in ('true'):
return True
elif value.lower() in ('false'):
return False
else:
raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).")
2024-10-02 16:25:45 +02:00
else:
return value
2024-10-02 22:36:06 +02:00
@classmethod
def _query(cls, parameter_name: str) -> str:
response = requests.get(NuconConfig.base_url, params={"variable": parameter_name})
if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}")
return response.text.strip()
2024-10-02 16:25:45 +02:00
@classmethod
def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
if parameter.enum_type:
return next(iter(parameter.enum_type))
elif parameter.param_type == float:
2024-10-02 22:36:06 +02:00
if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else:
return 3.14
2024-10-02 16:25:45 +02:00
elif parameter.param_type == int:
2024-10-02 22:36:06 +02:00
if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else:
return 42
2024-10-02 16:25:45 +02:00
elif parameter.param_type == bool:
return False
else:
2024-10-02 22:36:06 +02:00
return "dummy"
2024-10-02 16:25:45 +02:00
@classmethod
def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return {param: cls.get(param) for param in parameters}
@classmethod
def get_all(cls) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
return cls.get_multiple(list(cls))
@classmethod
2024-10-02 17:05:20 +02:00
def set(cls, parameter: Union['Nucon', str], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
if isinstance(parameter, str):
2024-10-02 18:43:18 +02:00
parameter = next((param for param in cls if param.id == parameter), None)
if parameter is None:
raise ValueError(f"No parameter found with id '{parameter}'")
2024-10-02 17:05:20 +02:00
2024-10-02 16:51:12 +02:00
if not force and not parameter.is_writable:
2024-10-02 16:25:45 +02:00
raise ValueError(f"Parameter {parameter.name} is not writable")
2024-10-02 18:43:18 +02:00
if not force:
parameter.check_in_range(value, raise_on_oob=True)
2024-10-02 16:25:45 +02:00
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
if NuconConfig.dummy_mode:
2024-10-02 16:51:12 +02:00
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter.name} to {value}")
2024-10-02 16:25:45 +02:00
return
2024-10-02 19:58:36 +02:00
response = requests.post(NuconConfig.base_url, params={"variable": parameter.name, "value": str(value)})
2024-10-02 16:25:45 +02:00
if response.status_code != 200:
2024-10-02 17:05:20 +02:00
raise Exception(f"Failed to set parameter {parameter.name}. Status code: {response.status_code}")
2024-10-02 16:25:45 +02:00
# Example usage
if __name__ == "__main__":
# Enable dummy mode for testing
Nucon.set_dummy_mode(True)
# Get a single parameter
core_temp = Nucon.CORE_TEMP.value
print(f"Core Temperature: {core_temp}")
# Get a parameter with an enum
pump_status = Nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
print(f"Pump 0 Status: {pump_status}")
# Set a parameter with an enum
try:
Nucon.GENERATOR_0_BREAKER.value = BreakerStatus.OPEN
print(f"Successfully set GENERATOR_0_BREAKER to {Nucon.GENERATOR_0_BREAKER.value}")
except ValueError as e:
print(f"Error: {e}")
# Get all parameters
all_params = Nucon.get_all()
print("All parameters:")
for param, value in all_params.items():
print(f"{param.name}: {value}")