Fixes and Updates
This commit is contained in:
parent
60cd44cc9e
commit
33b5db2f57
443
nucon/core.py
443
nucon/core.py
@ -1,28 +1,20 @@
|
|||||||
from enum import Enum, IntEnum
|
from enum import Enum
|
||||||
|
from typing import Union, Dict, Type, List, Optional, Any, Iterator, Tuple
|
||||||
import requests
|
import requests
|
||||||
from typing import Union, Dict, Type, List, Optional, Any
|
import random
|
||||||
|
|
||||||
class NuconConfig:
|
|
||||||
base_url = "http://localhost:8785/"
|
|
||||||
dummy_mode = False
|
|
||||||
|
|
||||||
class ParameterEnum(Enum):
|
class ParameterEnum(Enum):
|
||||||
@classmethod
|
@classmethod
|
||||||
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
|
def _missing_(cls, value: Any) -> Union['ParameterEnum', None]:
|
||||||
if isinstance(value, str):
|
if isinstance(value, str):
|
||||||
# Handle boolean-like strings
|
if value.lower() == 'true':
|
||||||
if value.lower() in ('true'):
|
|
||||||
return cls(True)
|
return cls(True)
|
||||||
elif value.lower() in ('false'):
|
elif value.lower() == 'false':
|
||||||
return cls(False)
|
return cls(False)
|
||||||
|
|
||||||
# Try to convert to int for int-based enums
|
|
||||||
try:
|
try:
|
||||||
return cls(int(value))
|
return cls(int(value))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# If we can't handle the value, let the default Enum behavior take over
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
class PumpStatus(ParameterEnum):
|
class PumpStatus(ParameterEnum):
|
||||||
@ -61,214 +53,208 @@ CoreState = str
|
|||||||
CoolantCoreState = str
|
CoolantCoreState = str
|
||||||
RodsState = str
|
RodsState = str
|
||||||
|
|
||||||
#class CoreState(ParameterEnum):
|
class NuconParameter:
|
||||||
# REACTIVE = 'REACTIVO'
|
def __init__(self, nucon: 'Nucon', id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
|
||||||
#
|
self.nucon = nucon
|
||||||
# def __bool__(self):
|
|
||||||
# return self.value == 'REACTIVO'
|
|
||||||
|
|
||||||
#class CoolantCoreState(ParameterEnum):
|
|
||||||
# CIRCULATING = 'CIRCULANDO'
|
|
||||||
#
|
|
||||||
# def __bool__(self):
|
|
||||||
# return self.value == 'CIRCULANDO'
|
|
||||||
|
|
||||||
|
|
||||||
class Nucon(Enum):
|
|
||||||
# Core
|
|
||||||
CORE_TEMP = ("CORE_TEMP", float, False)
|
|
||||||
CORE_TEMP_OPERATIVE = ("CORE_TEMP_OPERATIVE", float, False)
|
|
||||||
CORE_TEMP_MAX = ("CORE_TEMP_MAX", float, False)
|
|
||||||
CORE_TEMP_MIN = ("CORE_TEMP_MIN", float, False)
|
|
||||||
CORE_TEMP_RESIDUAL = ("CORE_TEMP_RESIDUAL", bool, False)
|
|
||||||
CORE_PRESSURE = ("CORE_PRESSURE", float, False)
|
|
||||||
CORE_PRESSURE_MAX = ("CORE_PRESSURE_MAX", float, False)
|
|
||||||
CORE_PRESSURE_OPERATIVE = ("CORE_PRESSURE_OPERATIVE", float, False)
|
|
||||||
CORE_INTEGRITY = ("CORE_INTEGRITY", float, False)
|
|
||||||
CORE_WEAR = ("CORE_WEAR", float, False)
|
|
||||||
CORE_STATE = ("CORE_STATE", CoreState, False)
|
|
||||||
CORE_STATE_CRITICALITY = ("CORE_STATE_CRITICALITY", float, False)
|
|
||||||
CORE_CRITICAL_MASS_REACHED = ("CORE_CRITICAL_MASS_REACHED", bool, False)
|
|
||||||
CORE_CRITICAL_MASS_REACHED_COUNTER = ("CORE_CRITICAL_MASS_REACHED_COUNTER", int, False)
|
|
||||||
CORE_IMMINENT_FUSION = ("CORE_IMMINENT_FUSION", bool, False)
|
|
||||||
CORE_READY_FOR_START = ("CORE_READY_FOR_START", bool, False)
|
|
||||||
CORE_STEAM_PRESENT = ("CORE_STEAM_PRESENT", bool, False)
|
|
||||||
CORE_HIGH_STEAM_PRESENT = ("CORE_HIGH_STEAM_PRESENT", bool, False)
|
|
||||||
|
|
||||||
# Time
|
|
||||||
TIME = ("TIME", str, False)
|
|
||||||
TIME_STAMP = ("TIME_STAMP", str, False)
|
|
||||||
|
|
||||||
# Coolant Core
|
|
||||||
COOLANT_CORE_STATE = ("COOLANT_CORE_STATE", CoolantCoreState, False)
|
|
||||||
COOLANT_CORE_PRESSURE = ("COOLANT_CORE_PRESSURE", float, False)
|
|
||||||
COOLANT_CORE_MAX_PRESSURE = ("COOLANT_CORE_MAX_PRESSURE", float, False)
|
|
||||||
COOLANT_CORE_VESSEL_TEMPERATURE = ("COOLANT_CORE_VESSEL_TEMPERATURE", float, False)
|
|
||||||
COOLANT_CORE_QUANTITY_IN_VESSEL = ("COOLANT_CORE_QUANTITY_IN_VESSEL", float, False)
|
|
||||||
COOLANT_CORE_PRIMARY_LOOP_LEVEL = ("COOLANT_CORE_PRIMARY_LOOP_LEVEL", float, False)
|
|
||||||
COOLANT_CORE_FLOW_SPEED = ("COOLANT_CORE_FLOW_SPEED", float, False)
|
|
||||||
COOLANT_CORE_FLOW_ORDERED_SPEED = ("COOLANT_CORE_FLOW_ORDERED_SPEED", float, False)
|
|
||||||
COOLANT_CORE_FLOW_REACHED_SPEED = ("COOLANT_CORE_FLOW_REACHED_SPEED", bool, False)
|
|
||||||
COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT", int, False)
|
|
||||||
COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT = ("COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT", int, False)
|
|
||||||
|
|
||||||
# Circulation Pumps
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_0_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_STATUS", PumpStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_1_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_STATUS", PumpStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_2_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_STATUS", PumpStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS", PumpDryStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS", PumpDryStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS", PumpDryStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS", PumpOverloadStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS", PumpOverloadStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS = ("COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS", PumpOverloadStatus, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED", float, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED", float, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED", float, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_0_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_0_SPEED", float, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_1_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_1_SPEED", float, False)
|
|
||||||
COOLANT_CORE_CIRCULATION_PUMP_2_SPEED = ("COOLANT_CORE_CIRCULATION_PUMP_2_SPEED", float, False)
|
|
||||||
|
|
||||||
# Rods
|
|
||||||
RODS_STATUS = ("RODS_STATUS", RodsState, False)
|
|
||||||
RODS_MOVEMENT_SPEED = ("RODS_MOVEMENT_SPEED", float, False)
|
|
||||||
RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE = ("RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE", bool, False)
|
|
||||||
RODS_DEFORMED = ("RODS_DEFORMED", bool, False)
|
|
||||||
RODS_TEMPERATURE = ("RODS_TEMPERATURE", float, False)
|
|
||||||
RODS_MAX_TEMPERATURE = ("RODS_MAX_TEMPERATURE", float, False)
|
|
||||||
RODS_POS_ORDERED = ("RODS_POS_ORDERED", float, True)
|
|
||||||
RODS_POS_ACTUAL = ("RODS_POS_ACTUAL", float, False)
|
|
||||||
RODS_POS_REACHED = ("RODS_POS_REACHED", bool, False)
|
|
||||||
RODS_QUANTITY = ("RODS_QUANTITY", int, False)
|
|
||||||
RODS_ALIGNED = ("RODS_ALIGNED", bool, False)
|
|
||||||
|
|
||||||
# Generators
|
|
||||||
GENERATOR_0_KW = ("GENERATOR_0_KW", float, False)
|
|
||||||
GENERATOR_1_KW = ("GENERATOR_1_KW", float, False)
|
|
||||||
GENERATOR_2_KW = ("GENERATOR_2_KW", float, False)
|
|
||||||
GENERATOR_0_V = ("GENERATOR_0_V", float, False)
|
|
||||||
GENERATOR_1_V = ("GENERATOR_1_V", float, False)
|
|
||||||
GENERATOR_2_V = ("GENERATOR_2_V", float, False)
|
|
||||||
GENERATOR_0_A = ("GENERATOR_0_A", float, False)
|
|
||||||
GENERATOR_1_A = ("GENERATOR_1_A", float, False)
|
|
||||||
GENERATOR_2_A = ("GENERATOR_2_A", float, False)
|
|
||||||
GENERATOR_0_HERTZ = ("GENERATOR_0_HERTZ", float, False)
|
|
||||||
GENERATOR_1_HERTZ = ("GENERATOR_1_HERTZ", float, False)
|
|
||||||
GENERATOR_2_HERTZ = ("GENERATOR_2_HERTZ", float, False)
|
|
||||||
GENERATOR_0_BREAKER = ("GENERATOR_0_BREAKER", BreakerStatus, False)
|
|
||||||
GENERATOR_1_BREAKER = ("GENERATOR_1_BREAKER", BreakerStatus, False)
|
|
||||||
GENERATOR_2_BREAKER = ("GENERATOR_2_BREAKER", BreakerStatus, False)
|
|
||||||
|
|
||||||
# Steam Turbines
|
|
||||||
STEAM_TURBINE_0_RPM = ("STEAM_TURBINE_0_RPM", float, False)
|
|
||||||
STEAM_TURBINE_1_RPM = ("STEAM_TURBINE_1_RPM", float, False)
|
|
||||||
STEAM_TURBINE_2_RPM = ("STEAM_TURBINE_2_RPM", float, False)
|
|
||||||
STEAM_TURBINE_0_TEMPERATURE = ("STEAM_TURBINE_0_TEMPERATURE", float, False)
|
|
||||||
STEAM_TURBINE_1_TEMPERATURE = ("STEAM_TURBINE_1_TEMPERATURE", float, False)
|
|
||||||
STEAM_TURBINE_2_TEMPERATURE = ("STEAM_TURBINE_2_TEMPERATURE", float, False)
|
|
||||||
STEAM_TURBINE_0_PRESSURE = ("STEAM_TURBINE_0_PRESSURE", float, False)
|
|
||||||
STEAM_TURBINE_1_PRESSURE = ("STEAM_TURBINE_1_PRESSURE", float, False)
|
|
||||||
STEAM_TURBINE_2_PRESSURE = ("STEAM_TURBINE_2_PRESSURE", float, False)
|
|
||||||
|
|
||||||
def __init__(self, id: str, param_type: Type, is_writable: bool, min_val: Optional[Union[int, float]] = None, max_val: Optional[Union[int, float]] = None):
|
|
||||||
self.id = id
|
self.id = id
|
||||||
self.param_type = param_type
|
self.param_type = param_type
|
||||||
self.is_writable = is_writable
|
self.is_writable = is_writable
|
||||||
self.min_val = min_val
|
self.min_val = min_val
|
||||||
self.max_val = max_val
|
self.max_val = max_val
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
type_repr = repr(self.param_type)
|
|
||||||
return (f"<{self.__class__.__name__}.{self.name}: "
|
|
||||||
f"{{'value': {repr(self.value)}, 'type': {type_repr}, "
|
|
||||||
f"'writable': {self.is_writable}}}>"
|
|
||||||
)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.id
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def enum_type(self) -> Type[Enum]:
|
def enum_type(self) -> Type[Enum]:
|
||||||
return self.param_type if issubclass(self.param_type, Enum) else None
|
return self.param_type if issubclass(self.param_type, Enum) else None
|
||||||
|
|
||||||
@property
|
def check_in_range(self, value: Union[int, float, Enum], raise_on_oob: bool = False) -> bool:
|
||||||
def value(self) -> Union[float, int, bool, str, Enum]:
|
if self.enum_type:
|
||||||
return self.read()
|
if not isinstance(value, self.enum_type):
|
||||||
|
if raise_on_oob:
|
||||||
def read(self) -> Union[float, int, bool, str, Enum]:
|
raise ValueError(f"Value {value} is not a valid {self.enum_type.__name__}")
|
||||||
return Nucon.get(self)
|
return False
|
||||||
|
return True
|
||||||
@value.setter
|
|
||||||
def value(self, new_value: Union[float, int, bool, str, Enum]) -> None:
|
|
||||||
self.write(new_value)
|
|
||||||
|
|
||||||
def write(self, new_value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
|
|
||||||
Nucon.set(self, new_value, force)
|
|
||||||
|
|
||||||
def check_in_range(self, value: Union[int, float], raise_on_oob=False) -> None:
|
|
||||||
if self.min_val is not None and value < self.min_val:
|
if self.min_val is not None and value < self.min_val:
|
||||||
if raise_on_oob:
|
if raise_on_oob:
|
||||||
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val} for {self.name}")
|
raise ValueError(f"Value {value} is below the minimum allowed value {self.min_val}")
|
||||||
return False
|
return False
|
||||||
if self.max_val is not None and value > self.max_val:
|
if self.max_val is not None and value > self.max_val:
|
||||||
if raise_on_oob:
|
if raise_on_oob:
|
||||||
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val} for {self.name}")
|
raise ValueError(f"Value {value} is above the maximum allowed value {self.max_val}")
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@classmethod
|
@property
|
||||||
def set_base_url(cls, url: str) -> None:
|
def value(self):
|
||||||
NuconConfig.base_url = url
|
return self.nucon.get(self)
|
||||||
|
|
||||||
@classmethod
|
@value.setter
|
||||||
def set_dummy_mode(cls, dummy_mode: bool) -> None:
|
def value(self, new_value):
|
||||||
NuconConfig.dummy_mode = dummy_mode
|
self.nucon.set(self, new_value)
|
||||||
|
|
||||||
@classmethod
|
def read(self):
|
||||||
def get_all_readable(cls) -> List['Nucon']:
|
return self.value
|
||||||
return list(cls) # All parameters are readable
|
|
||||||
|
|
||||||
@classmethod
|
def write(self, new_value, force=False):
|
||||||
def get_all_writable(cls) -> List['Nucon']:
|
self.nucon.set(self, new_value, force)
|
||||||
return [param for param in cls if param.is_writable]
|
|
||||||
|
|
||||||
@classmethod
|
def __repr__(self):
|
||||||
def get(cls, parameter: Union['Nucon', str]) -> Union[float, int, bool, str, ParameterEnum]:
|
return f"NuconParameter(id='{self.id}', value={self.value}, type={self.param_type.__name__}, writable={self.is_writable})"
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.id
|
||||||
|
|
||||||
|
class Nucon:
|
||||||
|
def __init__(self, host: str = 'localhost', port: int = 8786):
|
||||||
|
self.base_url = f'http://{host}:{port}/'
|
||||||
|
self.dummy_mode = False
|
||||||
|
self._parameters = self._create_parameters()
|
||||||
|
|
||||||
|
def _create_parameters(self) -> Dict[str, NuconParameter]:
|
||||||
|
param_values = {
|
||||||
|
'CORE_TEMP': (float, False, 0, 1000),
|
||||||
|
'CORE_TEMP_OPERATIVE': (float, False),
|
||||||
|
'CORE_TEMP_MAX': (float, False),
|
||||||
|
'CORE_TEMP_MIN': (float, False),
|
||||||
|
'CORE_TEMP_RESIDUAL': (bool, False),
|
||||||
|
'CORE_PRESSURE': (float, False),
|
||||||
|
'CORE_PRESSURE_MAX': (float, False),
|
||||||
|
'CORE_PRESSURE_OPERATIVE': (float, False),
|
||||||
|
'CORE_INTEGRITY': (float, False),
|
||||||
|
'CORE_WEAR': (float, False),
|
||||||
|
'CORE_STATE': (CoreState, False),
|
||||||
|
'CORE_STATE_CRITICALITY': (float, False),
|
||||||
|
'CORE_CRITICAL_MASS_REACHED': (bool, False),
|
||||||
|
'CORE_CRITICAL_MASS_REACHED_COUNTER': (int, False),
|
||||||
|
'CORE_IMMINENT_FUSION': (bool, False),
|
||||||
|
'CORE_READY_FOR_START': (bool, False),
|
||||||
|
'CORE_STEAM_PRESENT': (bool, False),
|
||||||
|
'CORE_HIGH_STEAM_PRESENT': (bool, False),
|
||||||
|
'TIME': (str, False),
|
||||||
|
'TIME_STAMP': (str, False),
|
||||||
|
'COOLANT_CORE_STATE': (CoolantCoreState, False),
|
||||||
|
'COOLANT_CORE_PRESSURE': (float, False),
|
||||||
|
'COOLANT_CORE_MAX_PRESSURE': (float, False),
|
||||||
|
'COOLANT_CORE_VESSEL_TEMPERATURE': (float, False),
|
||||||
|
'COOLANT_CORE_QUANTITY_IN_VESSEL': (float, False),
|
||||||
|
'COOLANT_CORE_PRIMARY_LOOP_LEVEL': (float, False),
|
||||||
|
'COOLANT_CORE_FLOW_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_FLOW_ORDERED_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_FLOW_REACHED_SPEED': (bool, False),
|
||||||
|
'COOLANT_CORE_QUANTITY_CIRCULATION_PUMPS_PRESENT': (int, False),
|
||||||
|
'COOLANT_CORE_QUANTITY_FREIGHT_PUMPS_PRESENT': (int, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_0_STATUS': (PumpStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_1_STATUS': (PumpStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_2_STATUS': (PumpStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_0_DRY_STATUS': (PumpDryStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_1_DRY_STATUS': (PumpDryStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_2_DRY_STATUS': (PumpDryStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_0_OVERLOAD_STATUS': (PumpOverloadStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_1_OVERLOAD_STATUS': (PumpOverloadStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_2_OVERLOAD_STATUS': (PumpOverloadStatus, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_0_ORDERED_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_1_ORDERED_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_2_ORDERED_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_0_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_1_SPEED': (float, False),
|
||||||
|
'COOLANT_CORE_CIRCULATION_PUMP_2_SPEED': (float, False),
|
||||||
|
'RODS_STATUS': (RodsState, False),
|
||||||
|
'RODS_MOVEMENT_SPEED': (float, False),
|
||||||
|
'RODS_MOVEMENT_SPEED_DECREASED_HIGH_TEMPERATURE': (bool, False),
|
||||||
|
'RODS_DEFORMED': (bool, False),
|
||||||
|
'RODS_TEMPERATURE': (float, False),
|
||||||
|
'RODS_MAX_TEMPERATURE': (float, False),
|
||||||
|
'RODS_POS_ORDERED': (float, True),
|
||||||
|
'RODS_POS_ACTUAL': (float, False),
|
||||||
|
'RODS_POS_REACHED': (bool, False),
|
||||||
|
'RODS_QUANTITY': (int, False),
|
||||||
|
'RODS_ALIGNED': (bool, False),
|
||||||
|
'GENERATOR_0_KW': (float, False),
|
||||||
|
'GENERATOR_1_KW': (float, False),
|
||||||
|
'GENERATOR_2_KW': (float, False),
|
||||||
|
'GENERATOR_0_V': (float, False),
|
||||||
|
'GENERATOR_1_V': (float, False),
|
||||||
|
'GENERATOR_2_V': (float, False),
|
||||||
|
'GENERATOR_0_A': (float, False),
|
||||||
|
'GENERATOR_1_A': (float, False),
|
||||||
|
'GENERATOR_2_A': (float, False),
|
||||||
|
'GENERATOR_0_HERTZ': (float, False),
|
||||||
|
'GENERATOR_1_HERTZ': (float, False),
|
||||||
|
'GENERATOR_2_HERTZ': (float, False),
|
||||||
|
'GENERATOR_0_BREAKER': (BreakerStatus, False),
|
||||||
|
'GENERATOR_1_BREAKER': (BreakerStatus, False),
|
||||||
|
'GENERATOR_2_BREAKER': (BreakerStatus, False),
|
||||||
|
'STEAM_TURBINE_0_RPM': (float, False),
|
||||||
|
'STEAM_TURBINE_1_RPM': (float, False),
|
||||||
|
'STEAM_TURBINE_2_RPM': (float, False),
|
||||||
|
'STEAM_TURBINE_0_TEMPERATURE': (float, False),
|
||||||
|
'STEAM_TURBINE_1_TEMPERATURE': (float, False),
|
||||||
|
'STEAM_TURBINE_2_TEMPERATURE': (float, False),
|
||||||
|
'STEAM_TURBINE_0_PRESSURE': (float, False),
|
||||||
|
'STEAM_TURBINE_1_PRESSURE': (float, False),
|
||||||
|
'STEAM_TURBINE_2_PRESSURE': (float, False),
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
name: NuconParameter(self, name, *values)
|
||||||
|
for name, values in param_values.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
def get(self, parameter: Union[str, NuconParameter]) -> Union[float, int, bool, str, Enum]:
|
||||||
if isinstance(parameter, str):
|
if isinstance(parameter, str):
|
||||||
parameter = cls[parameter]
|
parameter = self._parameters[parameter]
|
||||||
|
|
||||||
|
if self.dummy_mode:
|
||||||
|
return self._get_dummy_value(parameter)
|
||||||
|
|
||||||
if NuconConfig.dummy_mode:
|
value = self._query(parameter)
|
||||||
return cls._get_dummy_value(parameter)
|
|
||||||
|
if parameter.enum_type:
|
||||||
value = cls._query(parameter.name)
|
|
||||||
|
|
||||||
if parameter.enum_type and issubclass(parameter.enum_type, ParameterEnum):
|
|
||||||
return parameter.enum_type(value)
|
return parameter.enum_type(value)
|
||||||
elif parameter.param_type == float:
|
|
||||||
return float(value)
|
|
||||||
elif parameter.param_type == int:
|
|
||||||
return int(value)
|
|
||||||
elif parameter.param_type == bool:
|
elif parameter.param_type == bool:
|
||||||
if value.lower() in ('true'):
|
if isinstance(value, str):
|
||||||
return True
|
if value.lower() not in ('true', 'false'):
|
||||||
elif value.lower() in ('false'):
|
raise ValueError(f"Invalid boolean value: {value}")
|
||||||
return False
|
return value.lower() == 'true'
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Invalid boolean value: {value}. Expected 'TRUE' or 'FALSE' (case insensitive).")
|
raise ValueError(f"Expected string for boolean parameter, got {type(value)}")
|
||||||
else:
|
else:
|
||||||
return value
|
return parameter.param_type(value)
|
||||||
|
|
||||||
@classmethod
|
def set(self, parameter: Union[str, NuconParameter], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
|
||||||
def _query(cls, parameter_name: str) -> str:
|
if isinstance(parameter, str):
|
||||||
response = requests.get(NuconConfig.base_url, params={"variable": parameter_name})
|
parameter = self._parameters[parameter]
|
||||||
|
|
||||||
|
if not force and not parameter.is_writable:
|
||||||
|
raise ValueError(f"Parameter {parameter} is not writable")
|
||||||
|
|
||||||
|
if not force:
|
||||||
|
parameter.check_in_range(value, raise_on_oob=True)
|
||||||
|
|
||||||
|
if parameter.enum_type and isinstance(value, parameter.enum_type):
|
||||||
|
value = value.value
|
||||||
|
|
||||||
|
if self.dummy_mode:
|
||||||
|
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter} to {value}")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._set_value(parameter, str(value))
|
||||||
|
|
||||||
|
def _query(self, parameter: NuconParameter) -> str:
|
||||||
|
response = requests.get(self.base_url, params={"variable": parameter.id})
|
||||||
|
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
raise Exception(f"Failed to query parameter {parameter_name}. Status code: {response.status_code}")
|
raise Exception(f"Failed to query parameter {parameter.id}. Status code: {response.status_code}")
|
||||||
|
|
||||||
return response.text.strip()
|
return response.text.strip()
|
||||||
|
|
||||||
@classmethod
|
def _set_value(self, parameter: NuconParameter, value: str) -> None:
|
||||||
def _get_dummy_value(cls, parameter: 'Nucon') -> Union[float, int, bool, str, Enum]:
|
response = requests.post(self.base_url, params={"variable": parameter.id, "value": value})
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception(f"Failed to set parameter {parameter.id}. Status code: {response.status_code}")
|
||||||
|
|
||||||
|
def _get_dummy_value(self, parameter: NuconParameter) -> Union[float, int, bool, str, Enum]:
|
||||||
if parameter.enum_type:
|
if parameter.enum_type:
|
||||||
return next(iter(parameter.enum_type))
|
return next(iter(parameter.enum_type))
|
||||||
elif parameter.param_type == float:
|
elif parameter.param_type == float:
|
||||||
@ -278,69 +264,52 @@ class Nucon(Enum):
|
|||||||
return 3.14
|
return 3.14
|
||||||
elif parameter.param_type == int:
|
elif parameter.param_type == int:
|
||||||
if parameter.max_val is not None and parameter.min_val is not None:
|
if parameter.max_val is not None and parameter.min_val is not None:
|
||||||
return (parameter.max_val - parameter.min_val) / 2 + parameter.min_val
|
return (parameter.max_val - parameter.min_val) // 2 + parameter.min_val
|
||||||
else:
|
else:
|
||||||
return 42
|
return 42
|
||||||
elif parameter.param_type == bool:
|
elif parameter.param_type == bool:
|
||||||
return False
|
return random.choice([True, False])
|
||||||
else:
|
else:
|
||||||
return "dummy"
|
return "dummy"
|
||||||
|
|
||||||
@classmethod
|
def get_multiple_iter(self, parameters: List[Union[str, NuconParameter]]) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
|
||||||
def get_multiple(cls, parameters: List['Nucon']) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
|
for param in parameters:
|
||||||
return {param: cls.get(param) for param in parameters}
|
if isinstance(param, str):
|
||||||
|
param_name = param
|
||||||
|
param_obj = self._parameters[param]
|
||||||
|
else:
|
||||||
|
param_name = next(name for name, p in self._parameters.items() if p is param)
|
||||||
|
param_obj = param
|
||||||
|
yield param_name, self.get(param_obj)
|
||||||
|
|
||||||
@classmethod
|
def get_multiple(self, parameters: List[Union[str, NuconParameter]]) -> Dict[str, Union[float, int, bool, str, Enum]]:
|
||||||
def get_all(cls) -> Dict['Nucon', Union[float, int, bool, str, Enum]]:
|
return dict(self.get_multiple_iter(parameters))
|
||||||
return cls.get_multiple(list(cls))
|
|
||||||
|
|
||||||
@classmethod
|
def get_all_iter(self) -> Iterator[Tuple[str, Union[float, int, bool, str, Enum]]]:
|
||||||
def set(cls, parameter: Union['Nucon', str], value: Union[float, int, bool, str, Enum], force: bool = False) -> None:
|
return self.get_multiple_iter(self._parameters.keys())
|
||||||
if isinstance(parameter, str):
|
|
||||||
parameter = next((param for param in cls if param.id == parameter), None)
|
|
||||||
if parameter is None:
|
|
||||||
raise ValueError(f"No parameter found with id '{parameter}'")
|
|
||||||
|
|
||||||
if not force and not parameter.is_writable:
|
def get_all(self) -> Dict[str, Union[float, int, bool, str, Enum]]:
|
||||||
raise ValueError(f"Parameter {parameter.name} is not writable")
|
return dict(self.get_all_iter())
|
||||||
|
|
||||||
if not force:
|
def get_all_readable(self) -> List[NuconParameter]:
|
||||||
parameter.check_in_range(value, raise_on_oob=True)
|
return {name: param for name, param in self._parameters.items()}
|
||||||
|
|
||||||
if parameter.enum_type and isinstance(value, parameter.enum_type):
|
def get_all_writable(self) -> List[NuconParameter]:
|
||||||
value = value.value
|
return {name: param for name, param in self._parameters.items() if param.is_writable}
|
||||||
|
|
||||||
if NuconConfig.dummy_mode:
|
def set_dummy_mode(self, dummy_mode: bool) -> None:
|
||||||
print(f"Dummy mode: {'Force ' if force else ''}Setting {parameter.name} to {value}")
|
self.dummy_mode = dummy_mode
|
||||||
return
|
|
||||||
|
|
||||||
response = requests.post(NuconConfig.base_url, params={"variable": parameter.name, "value": str(value)})
|
def __getattr__(self, name):
|
||||||
|
if name in self._parameters:
|
||||||
if response.status_code != 200:
|
return self._parameters[name]
|
||||||
raise Exception(f"Failed to set parameter {parameter.name}. Status code: {response.status_code}")
|
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
|
||||||
|
|
||||||
# Example usage
|
def __getitem__(self, key):
|
||||||
if __name__ == "__main__":
|
return self.__getattr__(key)
|
||||||
# Enable dummy mode for testing
|
|
||||||
Nucon.set_dummy_mode(True)
|
|
||||||
|
|
||||||
# Get a single parameter
|
def __dir__(self):
|
||||||
core_temp = Nucon.CORE_TEMP.value
|
return list(super().__dir__()) + list(self._parameters.keys())
|
||||||
print(f"Core Temperature: {core_temp}")
|
|
||||||
|
|
||||||
# Get a parameter with an enum
|
def __len__(self):
|
||||||
pump_status = Nucon.COOLANT_CORE_CIRCULATION_PUMP_0_STATUS.value
|
return len(self._parameters)
|
||||||
print(f"Pump 0 Status: {pump_status}")
|
|
||||||
|
|
||||||
# Set a parameter with an enum
|
|
||||||
try:
|
|
||||||
Nucon.GENERATOR_0_BREAKER.value = BreakerStatus.OPEN
|
|
||||||
print(f"Successfully set GENERATOR_0_BREAKER to {Nucon.GENERATOR_0_BREAKER.value}")
|
|
||||||
except ValueError as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
|
|
||||||
# Get all parameters
|
|
||||||
all_params = Nucon.get_all()
|
|
||||||
print("All parameters:")
|
|
||||||
for param, value in all_params.items():
|
|
||||||
print(f"{param.name}: {value}")
|
|
Loading…
Reference in New Issue
Block a user