316 lines
13 KiB
Python
316 lines
13 KiB
Python
from enum import Enum
|
|
from typing import Union, Dict, Type, List, Optional, Any, Iterator, Tuple
|
|
import requests
|
|
import random
|
|
|
|
class ParameterEnum(Enum):
|
|
@classmethod
|
|
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
|
|
if isinstance(value, str):
|
|
if value.lower() == 'true':
|
|
return cls(True)
|
|
elif value.lower() == 'false':
|
|
return cls(False)
|
|
try:
|
|
return cls(int(value))
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
class PumpStatus(ParameterEnum):
|
|
INACTIVE = 0
|
|
ACTIVE_NO_SPEED_REACHED = 1
|
|
ACTIVE_SPEED_REACHED = 2
|
|
REQUIRES_MAINTENANCE = 3
|
|
NOT_INSTALLED = 4
|
|
INSUFFICIENT_ENERGY = 5
|
|
|
|
def __bool__(self):
|
|
return self.value in (1, 2)
|
|
|
|
class PumpDryStatus(ParameterEnum):
|
|
ACTIVE_WITHOUT_FLUID = 1
|
|
INACTIVE_OR_ACTIVE_WITH_FLUID = 4
|
|
|
|
def __bool__(self):
|
|
return self.value == 1
|
|
|
|
class PumpOverloadStatus(ParameterEnum):
|
|
ACTIVE_AND_OVERLOAD = 1
|
|
INACTIVE_OR_ACTIVE_NO_OVERLOAD = 4
|
|
|
|
def __bool__(self):
|
|
return self.value == 1
|
|
|
|
class BreakerStatus(ParameterEnum):
|
|
OPEN = True
|
|
CLOSED = False
|
|
|
|
def __bool__(self):
|
|
return self.value
|
|
|
|
CoreState = str
|
|
CoolantCoreState = str
|
|
RodsState = str
|
|
|
|
class NuconParameter:
|
|
def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
|
|
self.nucon = nucon
|
|
self.id = id
|
|
self.param_type = param_type
|
|
self.is_writable = is_writable
|
|
self.min_val = min_val
|
|
self.max_val = max_val
|
|
|
|
@property
|
|
def enum_type(self) -> Type[Enum]:
|
|
return self.param_type if issubclass(self.param_type, Enum) else None
|
|
|
|
def check_in_range(self, value: Union[int, float, Enum], raise_on_oob: bool = False) -> bool:
|
|
if self.enum_type:
|
|
if not isinstance(value, self.enum_type):
|
|
if raise_on_oob:
|
|
raise ValueError(f"Value {value} is not a valid {self.enum_type.__name__}")
|
|
return False
|
|
return True
|
|
|
|
if self.min_val is not None and value < self.min_val:
|
|
if raise_on_oob:
|
|
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val}")
|
|
return False
|
|
if self.max_val is not None and value > self.max_val:
|
|
if raise_on_oob:
|
|
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val}")
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def value(self):
|
|
return self.nucon.get(self)
|
|
|
|
@value.setter
|
|
def value(self, new_value):
|
|
self.nucon.set(self, new_value)
|
|
|
|
def read(self):
|
|
return self.value
|
|
|
|
def write(self, new_value, force=False):
|
|
self.nucon.set(self, new_value, force)
|
|
|
|
def __repr__(self):
|
|
return f"NuconParameter(id='{self.id}', value={self.value}, param_type={self.param_type.__name__}, is_writable={self.is_writable})"
|
|
|
|
def __str__(self):
|
|
return self.id
|
|
|
|
class Nucon:
|
|
def __init__(self, host: str = 'localhost', port: int = 8786):
|
|
self.base_url = f'http://{host}:{port}/'
|
|
self.dummy_mode = False
|
|
self._parameters = self._create_parameters()
|
|
|
|
def _create_parameters(self) -> Dict[str, NuconParameter]:
|
|
param_values = {
|
|
'CORE_TEMP': (float, False, 0, 1000),
|
|
'CORE_TEMP_OPERATIVE': (float, False),
|
|
'CORE_TEMP_MAX': (float, False),
|
|
'CORE_TEMP_MIN': (float, False),
|
|
'CORE_TEMP_RESIDUAL': (bool, False),
|
|
'CORE_PRESSURE': (float, False),
|
|
'CORE_PRESSURE_MAX': (float, False),
|
|
'CORE_PRESSURE_OPERATIVE': (float, False),
|
|
'CORE_INTEGRITY': (float, False),
|
|
'CORE_WEAR': (float, False),
|
|
'CORE_STATE': (CoreState, False),
|
|
'CORE_STATE_CRITICALITY': (float, False),
|
|
'CORE_CRITICAL_MASS_REACHED': (bool, False),
|
|
'CORE_CRITICAL_MASS_REACHED_COUNTER': (int, False),
|
|
'CORE_IMMINENT_FUSION': (bool, False),
|
|
'CORE_READY_FOR_START': (bool, False),
|
|
'CORE_STEAM_PRESENT': (bool, False),
|
|
'CORE_HIGH_STEAM_PRESENT': (bool, False),
|
|
'TIME': (str, False),
|
|
'TIME_STAMP': (str, False),
|
|
'COOLANT_CORE_STATE': (CoolantCoreState, False),
|
|
'COOLANT_CORE_PRESSURE': (float, False),
|
|
'COOLANT_CORE_MAX_PRESSURE': (float, False),
|
|
'COOLANT_CORE_VESSEL_TEMPERATURE': (float, False),
|
|
'COOLANT_CORE_QUANTITY_IN_VESSEL': (float, False),
|
|
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (float, False),
|
|
'COOLANT_CORE_FLOW_SPEED': (float, False),
|
|
'COOLANT_CORE_FLOW_ORDERED_SPEED': (float, False),
|
|
'COOLANT_CORE_FLOW_REACHED_SPEED': (bool, False),
|
|
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': (int, False),
|
|
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': (int, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': (PumpStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': (PumpStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': (PumpStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': (PumpDryStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': (PumpDryStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': (PumpDryStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': (PumpOverloadStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': (PumpOverloadStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': (PumpOverloadStatus, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (float, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (float, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (float, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (float, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (float, False),
|
|
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (float, False),
|
|
'RODS_STATUS': (RodsState, False),
|
|
'RODS_MOVEMENT_SPEED': (float, False),
|
|
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': (bool, False),
|
|
'RODS_DEFORMED': (bool, False),
|
|
'RODS_TEMPERATURE': (float, False),
|
|
'RODS_MAX_TEMPERATURE': (float, False),
|
|
'RODS_POS_ORDERED': (float, True),
|
|
'RODS_POS_ACTUAL': (float, False),
|
|
'RODS_POS_REACHED': (bool, False),
|
|
'RODS_QUANTITY': (int, False),
|
|
'RODS_ALIGNED': (bool, False),
|
|
'GENERATOR_0_KW': (float, False),
|
|
'GENERATOR_1_KW': (float, False),
|
|
'GENERATOR_2_KW': (float, False),
|
|
'GENERATOR_0_V': (float, False),
|
|
'GENERATOR_1_V': (float, False),
|
|
'GENERATOR_2_V': (float, False),
|
|
'GENERATOR_0_A': (float, False),
|
|
'GENERATOR_1_A': (float, False),
|
|
'GENERATOR_2_A': (float, False),
|
|
'GENERATOR_0_HERTZ': (float, False),
|
|
'GENERATOR_1_HERTZ': (float, False),
|
|
'GENERATOR_2_HERTZ': (float, False),
|
|
'GENERATOR_0_BREAKER': (BreakerStatus, False),
|
|
'GENERATOR_1_BREAKER': (BreakerStatus, False),
|
|
'GENERATOR_2_BREAKER': (BreakerStatus, False),
|
|
'STEAM_TURBINE_0_RPM': (float, False),
|
|
'STEAM_TURBINE_1_RPM': (float, False),
|
|
'STEAM_TURBINE_2_RPM': (float, False),
|
|
'STEAM_TURBINE_0_TEMPERATURE': (float, False),
|
|
'STEAM_TURBINE_1_TEMPERATURE': (float, False),
|
|
'STEAM_TURBINE_2_TEMPERATURE': (float, False),
|
|
'STEAM_TURBINE_0_PRESSURE': (float, False),
|
|
'STEAM_TURBINE_1_PRESSURE': (float, False),
|
|
'STEAM_TURBINE_2_PRESSURE': (float, False),
|
|
}
|
|
|
|
return {
|
|
name: NuconParameter(self, name, *values)
|
|
for name, values in param_values.items()
|
|
}
|
|
|
|
def get(self, parameter: Union[str, NuconParameter]) -> Union[float, int, bool, str, Enum]:
|
|
if isinstance(parameter, str):
|
|
parameter = self._parameters[parameter]
|
|
|
|
if self.dummy_mode:
|
|
return self._get_dummy_value(parameter)
|
|
|
|
value = self._query(parameter)
|
|
|
|
if parameter.enum_type:
|
|
return parameter.enum_type(value)
|
|
elif parameter.param_type == bool:
|
|
if isinstance(value, str):
|
|
if value.lower() not in ('true', 'false'):
|
|
raise ValueError(f"Invalid boolean value: {value}")
|
|
return value.lower() == 'true'
|
|
else:
|
|
raise ValueError(f"Expected string for boolean parameter, got {type(value)}")
|
|
else:
|
|
return parameter.param_type(value)
|
|
|
|
def set(self, parameter: Union[str, NuconParameter], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
|
|
if isinstance(parameter, str):
|
|
parameter = self._parameters[parameter]
|
|
|
|
if not force and not parameter.is_writable:
|
|
raise ValueError(f"Parameter {parameter} is not writable")
|
|
|
|
if not force:
|
|
parameter.check_in_range(value, raise_on_oob=True)
|
|
|
|
if parameter.enum_type and isinstance(value, parameter.enum_type):
|
|
value = value.value
|
|
|
|
if self.dummy_mode:
|
|
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter} to {value}")
|
|
return
|
|
|
|
self._set_value(parameter, str(value))
|
|
|
|
def _query(self, parameter: NuconParameter) -> str:
|
|
response = requests.get(self.base_url, params={"variable": parameter.id})
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Failed to query parameter {parameter.id}. Status code: {response.status_code}")
|
|
|
|
return response.text.strip()
|
|
|
|
def _set_value(self, parameter: NuconParameter, value: str) -> None:
|
|
response = requests.post(self.base_url, params={"variable": parameter.id, "value": value})
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Failed to set parameter {parameter.id}. Status code: {response.status_code}")
|
|
|
|
def _get_dummy_value(self, parameter: NuconParameter) -> Union[float, int, bool, str, Enum]:
|
|
if parameter.enum_type:
|
|
return next(iter(parameter.enum_type))
|
|
elif parameter.param_type == float:
|
|
if parameter.max_val is not None and parameter.min_val is not None:
|
|
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
|
|
else:
|
|
return 3.14
|
|
elif parameter.param_type == int:
|
|
if parameter.max_val is not None and parameter.min_val is not None:
|
|
return (parameter.max_val - parameter.min_val) // 2 + parameter.min_val
|
|
else:
|
|
return 42
|
|
elif parameter.param_type == bool:
|
|
return random.choice([True, False])
|
|
else:
|
|
return "dummy"
|
|
|
|
def get_multiple_iter(self, parameters: List[Union[str, NuconParameter]]) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
|
|
for param in parameters:
|
|
if isinstance(param, str):
|
|
param_name = param
|
|
param_obj = self._parameters[param]
|
|
else:
|
|
param_name = next(name for name, p in self._parameters.items() if p is param)
|
|
param_obj = param
|
|
yield param_name, self.get(param_obj)
|
|
|
|
def get_multiple(self, parameters: List[Union[str, NuconParameter]]) -> Dict[str, Union[float, int, bool, str, Enum]]:
|
|
return dict(self.get_multiple_iter(parameters))
|
|
|
|
def get_all_iter(self) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
|
|
return self.get_multiple_iter(self._parameters.keys())
|
|
|
|
def get_all(self) -> Dict[str, Union[float, int, bool, str, Enum]]:
|
|
return dict(self.get_all_iter())
|
|
|
|
def get_all_readable(self) -> List[NuconParameter]:
|
|
return {name: param for name, param in self._parameters.items()}
|
|
|
|
def get_all_writable(self) -> List[NuconParameter]:
|
|
return {name: param for name, param in self._parameters.items() if param.is_writable}
|
|
|
|
def set_dummy_mode(self, dummy_mode: bool) -> None:
|
|
self.dummy_mode = dummy_mode
|
|
|
|
def __getattr__(self, name):
|
|
if name in self._parameters:
|
|
return self._parameters[name]
|
|
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
|
|
|
|
def __getitem__(self, key):
|
|
return self.__getattr__(key)
|
|
|
|
def __dir__(self):
|
|
return list(super().__dir__()) + list(self._parameters.keys())
|
|
|
|
def __len__(self):
|
|
return len(self._parameters)
|