NuCon/nucon/core.py
2024-10-03 21:56:16 +02:00

315 lines
13 KiB
Python

from enum import Enum
from typing import Union, Dict, Type, List, Optional, Any, Iterator, Tuple
import requests
import random
class ParameterEnum(Enum):
@classmethod
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
if isinstance(value, str):
if value.lower() == 'true':
return cls(True)
elif value.lower() == 'false':
return cls(False)
try:
return cls(int(value))
except ValueError:
pass
return None
class PumpStatus(ParameterEnum):
INACTIVE = 0
ACTIVE_NO_SPEED_REACHED = 1
ACTIVE_SPEED_REACHED = 2
REQUIRES_MAINTENANCE = 3
NOT_INSTALLED = 4
INSUFFICIENT_ENERGY = 5
def __bool__(self):
return self.value in (1, 2)
class PumpDryStatus(ParameterEnum):
ACTIVE_WITHOUT_FLUID = 1
INACTIVE_OR_ACTIVE_WITH_FLUID = 4
def __bool__(self):
return self.value == 4
class PumpOverloadStatus(ParameterEnum):
ACTIVE_AND_OVERLOAD = 1
INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4
def __bool__(self):
return self.value == 4
class BreakerStatus(ParameterEnum):
OPEN = True
CLOSED = False
def __bool__(self):
return self.value
CoreState = str
CoolantCoreState = str
RodsState = str
class NuconParameter:
def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
self.nucon = nucon
self.id = id
self.param_type = param_type
self.is_writable = is_writable
self.min_val = min_val
self.max_val = max_val
@property
def enum_type(self) -> Type[Enum]:
return self.param_type if issubclass(self.param_type, Enum) else None
def check_in_range(self, value: Union[int, float, Enum], raise_on_oob: bool = False) -> bool:
if self.enum_type:
if not isinstance(value, self.enum_type):
if raise_on_oob:
raise ValueError(f"Value {value} is not a valid {self.enum_type.__name__}")
return False
return True
if self.min_val is not None and value < self.min_val:
if raise_on_oob:
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val}")
return False
if self.max_val is not None and value > self.max_val:
if raise_on_oob:
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val}")
return False
return True
@property
def value(self):
return self.nucon.get(self)
@value.setter
def value(self, new_value):
self.nucon.set(self, new_value)
def read(self):
return self.value
def write(self, new_value, force=False):
self.nucon.set(self, new_value, force)
def __repr__(self):
return f"NuconParameter(id='{self.id}', value={self.value}, type={self.param_type.__name__}, writable={self.is_writable})"
def __str__(self):
return self.id
class Nucon:
def __init__(self, host: str = 'localhost', port: int = 8786):
self.base_url = f'http://{host}:{port}/'
self.dummy_mode = False
self._parameters = self._create_parameters()
def _create_parameters(self) -> Dict[str, NuconParameter]:
param_values = {
'CORE_TEMP': (float, False, 0, 1000),
'CORE_TEMP_OPERATIVE': (float, False),
'CORE_TEMP_MAX': (float, False),
'CORE_TEMP_MIN': (float, False),
'CORE_TEMP_RESIDUAL': (bool, False),
'CORE_PRESSURE': (float, False),
'CORE_PRESSURE_MAX': (float, False),
'CORE_PRESSURE_OPERATIVE': (float, False),
'CORE_INTEGRITY': (float, False),
'CORE_WEAR': (float, False),
'CORE_STATE': (CoreState, False),
'CORE_STATE_CRITICALITY': (float, False),
'CORE_CRITICAL_MASS_REACHED': (bool, False),
'CORE_CRITICAL_MASS_REACHED_COUNTER': (int, False),
'CORE_IMMINENT_FUSION': (bool, False),
'CORE_READY_FOR_START': (bool, False),
'CORE_STEAM_PRESENT': (bool, False),
'CORE_HIGH_STEAM_PRESENT': (bool, False),
'TIME': (str, False),
'TIME_STAMP': (str, False),
'COOLANT_CORE_STATE': (CoolantCoreState, False),
'COOLANT_CORE_PRESSURE': (float, False),
'COOLANT_CORE_MAX_PRESSURE': (float, False),
'COOLANT_CORE_VESSEL_TEMPERATURE': (float, False),
'COOLANT_CORE_QUANTITY_IN_VESSEL': (float, False),
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (float, False),
'COOLANT_CORE_FLOW_SPEED': (float, False),
'COOLANT_CORE_FLOW_ORDERED_SPEED': (float, False),
'COOLANT_CORE_FLOW_REACHED_SPEED': (bool, False),
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': (int, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': (PumpStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': (PumpDryStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': (PumpOverloadStatus, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (float, False),
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (float, False),
'RODS_STATUS': (RodsState, False),
'RODS_MOVEMENT_SPEED': (float, False),
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': (bool, False),
'RODS_DEFORMED': (bool, False),
'RODS_TEMPERATURE': (float, False),
'RODS_MAX_TEMPERATURE': (float, False),
'RODS_POS_ORDERED': (float, True),
'RODS_POS_ACTUAL': (float, False),
'RODS_POS_REACHED': (bool, False),
'RODS_QUANTITY': (int, False),
'RODS_ALIGNED': (bool, False),
'GENERATOR_0_KW': (float, False),
'GENERATOR_1_KW': (float, False),
'GENERATOR_2_KW': (float, False),
'GENERATOR_0_V': (float, False),
'GENERATOR_1_V': (float, False),
'GENERATOR_2_V': (float, False),
'GENERATOR_0_A': (float, False),
'GENERATOR_1_A': (float, False),
'GENERATOR_2_A': (float, False),
'GENERATOR_0_HERTZ': (float, False),
'GENERATOR_1_HERTZ': (float, False),
'GENERATOR_2_HERTZ': (float, False),
'GENERATOR_0_BREAKER': (BreakerStatus, False),
'GENERATOR_1_BREAKER': (BreakerStatus, False),
'GENERATOR_2_BREAKER': (BreakerStatus, False),
'STEAM_TURBINE_0_RPM': (float, False),
'STEAM_TURBINE_1_RPM': (float, False),
'STEAM_TURBINE_2_RPM': (float, False),
'STEAM_TURBINE_0_TEMPERATURE': (float, False),
'STEAM_TURBINE_1_TEMPERATURE': (float, False),
'STEAM_TURBINE_2_TEMPERATURE': (float, False),
'STEAM_TURBINE_0_PRESSURE': (float, False),
'STEAM_TURBINE_1_PRESSURE': (float, False),
'STEAM_TURBINE_2_PRESSURE': (float, False),
}
return {
name: NuconParameter(self, name, *values)
for name, values in param_values.items()
}
def get(self, parameter: Union[str, NuconParameter]) -> Union[float, int, bool, str, Enum]:
if isinstance(parameter, str):
parameter = self._parameters[parameter]
if self.dummy_mode:
return self._get_dummy_value(parameter)
value = self._query(parameter)
if parameter.enum_type:
return parameter.enum_type(value)
elif parameter.param_type == bool:
if isinstance(value, str):
if value.lower() not in ('true', 'false'):
raise ValueError(f"Invalid boolean value: {value}")
return value.lower() == 'true'
else:
raise ValueError(f"Expected string for boolean parameter, got {type(value)}")
else:
return parameter.param_type(value)
def set(self, parameter: Union[str, NuconParameter], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
if isinstance(parameter, str):
parameter = self._parameters[parameter]
if not force and not parameter.is_writable:
raise ValueError(f"Parameter {parameter} is not writable")
if not force:
parameter.check_in_range(value, raise_on_oob=True)
if parameter.enum_type and isinstance(value, parameter.enum_type):
value = value.value
if self.dummy_mode:
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter} to {value}")
return
self._set_value(parameter, str(value))
def _query(self, parameter: NuconParameter) -> str:
response = requests.get(self.base_url, params={"variable": parameter.id})
if response.status_code != 200:
raise Exception(f"Failed to query parameter {parameter.id}. Status code: {response.status_code}")
return response.text.strip()
def _set_value(self, parameter: NuconParameter, value: str) -> None:
response = requests.post(self.base_url, params={"variable": parameter.id, "value": value})
if response.status_code != 200:
raise Exception(f"Failed to set parameter {parameter.id}. Status code: {response.status_code}")
def _get_dummy_value(self, parameter: NuconParameter) -> Union[float, int, bool, str, Enum]:
if parameter.enum_type:
return next(iter(parameter.enum_type))
elif parameter.param_type == float:
if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
else:
return 3.14
elif parameter.param_type == int:
if parameter.max_val is not None and parameter.min_val is not None:
return (parameter.max_val - parameter.min_val) // 2 + parameter.min_val
else:
return 42
elif parameter.param_type == bool:
return random.choice([True, False])
else:
return "dummy"
def get_multiple_iter(self, parameters: List[Union[str, NuconParameter]]) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
for param in parameters:
if isinstance(param, str):
param_name = param
param_obj = self._parameters[param]
else:
param_name = next(name for name, p in self._parameters.items() if p is param)
param_obj = param
yield param_name, self.get(param_obj)
def get_multiple(self, parameters: List[Union[str, NuconParameter]]) -> Dict[str, Union[float, int, bool, str, Enum]]:
return dict(self.get_multiple_iter(parameters))
def get_all_iter(self) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
return self.get_multiple_iter(self._parameters.keys())
def get_all(self) -> Dict[str, Union[float, int, bool, str, Enum]]:
return dict(self.get_all_iter())
def get_all_readable(self) -> List[NuconParameter]:
return {name: param for name, param in self._parameters.items()}
def get_all_writable(self) -> List[NuconParameter]:
return {name: param for name, param in self._parameters.items() if param.is_writable}
def set_dummy_mode(self, dummy_mode: bool) -> None:
self.dummy_mode = dummy_mode
def __getattr__(self, name):
if name in self._parameters:
return self._parameters[name]
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
def __getitem__(self, key):
return self.__getattr__(key)
def __dir__(self):
return list(super().__dir__()) + list(self._parameters.keys())
def __len__(self):
return len(self._parameters)