Skip to content

Instantly share code, notes, and snippets.

@NucciTheBoss
Created January 11, 2023 14:38
Show Gist options
  • Save NucciTheBoss/2304dfe6bb8d10cda3f2d1d294dfa09b to your computer and use it in GitHub Desktop.
Save NucciTheBoss/2304dfe6bb8d10cda3f2d1d294dfa09b to your computer and use it in GitHub Desktop.
An in-place configuration file editor for slurmdbd.conf written in Python
#!/usr/bin/env python3
"""Abstractions that wrap the slurmdb configuration file."""
import logging
import pathlib
import re
from collections import deque
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Deque, List, Tuple, Union
logger = logging.getLogger(__name__)
class SlurmdbdConfError(Exception):
"""Base error for any exceptions raised when operating on slurmdbd.conf."""
...
class SlurmdbdToken(Enum):
"""slurmdb.conf keys. Sourced from `man slurmdbd.conf.5`."""
ArchiveDir = "ArchiveDir"
ArchiveEvents = "ArchiveEvents"
ArchiveJobs = "ArchiveJobs"
ArchiveResvs = "ArchiveResvs"
ArchiveScript = "ArchiveScript"
ArchiveSteps = "ArchiveSteps"
ArchiveSuspend = "ArchiveSuspend"
ArchiveTXN = "ArchiveTXN"
ArchiveUsage = "ArchiveUsage"
AuthInfo = "AuthInfo"
AuthAltTypes = "AuthAltTypes"
AuthAltParameters = "AuthAltParameters"
AuthType = "AuthType"
CommitDelay = "CommitDelay"
CommunicationParameters = "CommunicationParameters"
DbdBackupHost = "DbdBackupHost"
DbdAddr = "DbdAddr"
DbdHost = "DbdHost"
DbdPort = "DbdPort"
DebugFlags = "DebugFlags"
DebugLevel = "DebugLevel"
DebugLevelSyslog = "DebugLevelSyslog"
DefaultQOS = "DefaultQOS"
LogFile = "LogFile"
LogTimeFormat = "LogTimeFormat"
MaxQueryTimeRange = "MaxQueryTimeRange"
MessageTimeout = "MessageTimeout"
Parameters = "Parameters"
PidFile = "PidFile"
PluginDir = "PluginDir"
PrivateData = "PrivateData"
PurgeEventAfter = "PurgeEventAfter"
PurgeJobAfter = "PurgeJobAfter"
PurgeResvAfter = "PurgeResvAfter"
PurgeStepAfter = "PurgeStepAfter"
PurgeSuspendAfter = "PurgeSuspendAfter"
PurgeTXNAfter = "PurgeTXNAfter"
PurgeUsageAfter = "PurgeUsageAfter"
SlurmUser = "SlurmUser"
StorageHost = "StorageHost"
StorageBackupHost = "StorageBackupHost"
StorageLoc = "StorageLoc"
StorageParameters = "StorageParameters"
StoragePass = "StoragePass"
StoragePort = "StoragePort"
StorageType = "StorageType"
StorageUser = "StorageUser"
TCPTimeout = "TCPTimeout"
TrackSlurmctldDown = "TrackSlurmctldDown"
TrackWCKey = "TrackWCKey"
# Match lines that do not start with "#". This will drop all comment lines.
match_no_comment = re.compile(r"^(?!#).*[^\n]$", re.MULTILINE)
# Match if entered auth type is valid.
match_auth_type = re.compile(r"(?<!.)auth/munge(?!.)")
# Match if bool value is either yes or no.
match_bool = re.compile(r"(?<!.)(yes|no)(?!.)")
# Match if entered debug flags are valid.
match_debug_flag = re.compile(
(
r"(?<!.)(DB_ARCHIVE|DB_ASSOC|DB_EVENT|"
r"DB_JOB|DB_QOS|DB_QUERY|DB_RESERVATION|"
r"DB_RESOURCE|DB_STEP|DB_TRES|DB_USAGE|DB_WCKEY|FEDERATION)(?!.)"
)
)
# Match if entered debug level is valid.
match_debug_level = re.compile(
r"(?<!.)(quiet|fatal|error|info|verbose|debug|debug2|debug3|debug4|debug5)(?!.)"
)
# Match if entered log time format is valid.
match_log_time_format = re.compile(
r"(?<!.)(iso8601|iso8601_ms|rfc5424|rfc5424_ms|clock|short)(?!.)"
)
# Match if password does not contain "#".
match_password = re.compile(r"#")
# Match if entered port number is valid.
match_port_num = re.compile(r"(?<!\d)\d{1,5}(?!\d)")
# Match if entered private data are valid.
match_private_data = re.compile(
r"(?<!.)(accounts|events|jobs|reservations|usage|users)(?!.)"
)
# Match if entered storage type is valid.
match_storage_type = re.compile(r"(?<!.)(accounting_storage/mysql)(?!.)")
# Match if entered time meets slurmdbd time format requirements.
match_time_format = re.compile(r"^\d+(hour|day|month)$")
# Match if entered time range is valid.
match_query_time_format = re.compile(
r"(?<!.)(\d+-\d+:\d+:\d+|\d+-\d+|\d+:\d+:\d+|\d+:\d+|INFINITE)(?!.)"
)
class LintRules:
"""Macros for linting slurmdbd.conf and configuration values."""
@staticmethod
def check_auth_type(auth_type: str) -> None:
if not match_auth_type.match(auth_type):
raise SlurmdbdConfError(f"Not a valid auth type: {auth_type}")
@staticmethod
def check_bool(bool_val: Union[str, bool]) -> None:
if type(bool_val) == bool:
return
elif not match_bool.match(bool_val):
raise SlurmdbdConfError(f"Not a valid boolean value: {bool_val}")
@staticmethod
def check_debug_flag(debug_flag: str) -> None:
if not match_debug_flag.match(debug_flag):
raise SlurmdbdConfError(f"Not a valid debug flag: {debug_flag}")
@staticmethod
def check_debug_level(debug_level: str) -> None:
if not match_debug_level.match(debug_level):
raise SlurmdbdConfError(f"Not a valid debug level: {debug_level}")
@staticmethod
def check_log_time_format(time_format: str) -> None:
if not match_log_time_format.match(time_format):
raise SlurmdbdConfError(f"Not a valid log time format: {time_format}")
@staticmethod
def check_password(password: str) -> None:
if len(match_password.findall(password)) > 0:
raise SlurmdbdConfError("Password cannot contain '#'")
@staticmethod
def check_port(port_number: int) -> None:
if not match_port_num.match(str(port_number)):
raise SlurmdbdConfError(f"Not a valid port number: {port_number}")
@staticmethod
def check_private_data(private_data: str) -> None:
if not match_private_data.match(private_data):
raise SlurmdbdConfError(f"Not a valid private data entry: {private_data}")
@staticmethod
def check_storage_type(storage_type: str) -> None:
if not match_storage_type.match(storage_type):
raise SlurmdbdConfError(f"Not a valid storage type: {storage_type}")
@staticmethod
def check_time_format(time_format: str) -> None:
if not match_time_format.match(time_format):
raise SlurmdbdConfError(f"Not a valid time format: {time_format}")
@staticmethod
def check_query_time_format(time_format: str) -> None:
if not match_query_time_format.match(time_format):
raise SlurmdbdConfError(f"Not a valid max query time format: {time_format}")
def parse_token(token: str) -> Dict:
"""Parse slurmdbd configuration tokens into Python-understandable format.
Args:
token (str): Token in "key=value" format.
Returns:
(Dict[str, Any]): The parsed token.
"""
# Classify token and return as {token: value}. i.e. {DbdPort: "12345"}
charset, processed = deque(c for c in token), []
while charset:
if (c := charset.popleft()) == "=":
if hasattr(SlurmdbdToken, (token_name := "".join(processed))):
return {getattr(SlurmdbdToken, token_name): "".join(c for c in charset)}
else:
raise SlurmdbdConfError(
f"Unrecognized slurmdbd configuration option: {token_name}"
)
else:
processed.append(c)
class SlurmdbdConf:
def __init__(
self, conf_file_path: str = "/etc/slurmdb.conf", wait: bool = False
) -> None:
"""Abstraction of slurmdbd.conf as a Python object.
Args:
conf_file_path (str): Path to the slurmdb configuration file
(Default: "/etc/slurmdb.conf").
wait (bool): False - load slurmdb.conf when SlurmDBConf is initialized.
True - do not load slurmdb.conf when SlurmDBConf is initialized. (Default: False)
"""
self._conf_file = pathlib.Path(conf_file_path)
self._metadata = {}
if not self._conf_file.exists():
logger.debug(f"Creating slurmdbd.conf at {conf_file_path}.")
self._conf_file.touch()
if not wait:
self.load()
def load(self) -> None:
"""Load slurmdbd.conf file into memory."""
logger.debug(f"Loading {str(self._conf_file)} into memory.")
self._metadata = self._scan(
deque(
lex for lex in re.findall(match_no_comment, self._conf_file.read_text())
)
)
def dump(self) -> None:
"""Dump parsed slurmdbd.conf into a file."""
if self._metadata == {}:
raise SlurmdbdConfError("Nothing to write.")
logger.debug(f"Dumping new slurmdbd.conf to {str(self._conf_file)}.")
content = [
"#",
f"# {str(self._conf_file)} generated at {str(datetime.now())}",
"#",
]
content.extend(f"{k.value}={v}" for k, v in self._metadata.items())
with self._conf_file.open("wt") as fout:
fout.writelines(f"{i}\n" for i in content)
def clear(self) -> None:
"""Clear the currently loaded slurmdbd.conf file from metadata."""
self._metadata = {}
def _scan(
self, metadata: Deque[str], _result_store: Dict[str, Any] = {}
) -> Dict[str, Any]:
"""Recursive scanner for parsing slurmdbd.conf.
Args:
metadata (Deque[str]): The lines for slurmdbd.conf formatted into a queue.
_result_store (Dict[str, Any]): Where each parsed line is stored.
Returns:
(Dict[str, Any]): Fully parsed slurmdbd.conf file.
"""
if metadata:
lexeme = metadata.popleft()
logger.debug(f"Parsing lexeme: {lexeme}.")
_result_store.update(parse_token(lexeme))
self._scan(metadata, _result_store)
return _result_store
@property
def archive_dir(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.ArchiveDir, None)
@archive_dir.setter
def archive_dir(self, value: str) -> None:
self._metadata[SlurmdbdToken.ArchiveDir] = value
@archive_dir.deleter
def archive_dir(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveDir]
@property
def archive_events(self) -> Union[bool, None]:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveEvents, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveEvents, None) == "no"
else None
)
@archive_events.setter
def archive_events(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.ArchiveEvents] = (
"yes" if value is True else "no" if value is False else value
)
@archive_events.deleter
def archive_events(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveEvents]
@property
def archive_jobs(self) -> Union[bool, None]:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveJobs, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveJobs, None) == "no"
else None
)
@archive_jobs.setter
def archive_jobs(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.ArchiveJobs] = (
"yes" if value is True else "no" if value is False else value
)
@archive_jobs.deleter
def archive_jobs(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveJobs]
@property
def archive_resvs(self) -> Union[str, None]:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveResvs, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveResvs, None) == "no"
else None
)
@archive_resvs.setter
def archive_resvs(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.ArchiveResvs] = (
"yes" if value is True else "no" if value is False else value
)
@archive_resvs.deleter
def archive_resvs(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveResvs]
@property
def archive_script(self) -> Union[str, None]:
return (
None
if (holder := self._metadata.get(SlurmdbdToken.ArchiveScript, None)) is None
else holder.get("value", None)
)
@archive_script.setter
def archive_script(self, value: str) -> None:
self._metadata[SlurmdbdToken.ArchiveScript] = value
@archive_script.deleter
def archive_script(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveScript]
@property
def archive_steps(self) -> Union[bool, None]:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveSteps, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveSteps, None) == "no"
else None
)
@archive_steps.setter
def archive_steps(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.ArchiveSteps] = (
"yes" if value is True else "no" if value is False else value
)
@archive_steps.deleter
def archive_steps(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveSteps]
@property
def archive_suspend(self) -> Union[bool, None]:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveSuspend, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveSuspend, None) == "no"
else None
)
@archive_suspend.setter
def archive_suspend(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.ArchiveSuspend] = (
"yes" if value is True else "no" if value is False else value
)
@archive_suspend.deleter
def archive_suspend(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveSuspend]
@property
def archive_txn(self) -> Union[bool, None]:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveTXN, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveTXN, None) == "no"
else None
)
@archive_txn.setter
def archive_txn(self, value) -> None:
self._metadata[SlurmdbdToken.ArchiveTXN] = (
"yes" if value is True else "no" if value is False else value
)
@archive_txn.deleter
def archive_txn(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveTXN]
@property
def archive_usage(self) -> None:
return (
True
if self._metadata.get(SlurmdbdToken.ArchiveUsage, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.ArchiveUsage, None) == "no"
else None
)
@archive_usage.setter
def archive_usage(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.ArchiveUsage] = (
"yes" if value is True else "no" if value is False else value
)
@archive_usage.deleter
def archive_usage(self) -> None:
del self._metadata[SlurmdbdToken.ArchiveUsage]
@property
def auth_info(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.AuthInfo, None)
@auth_info.setter
def auth_info(self, value: str) -> None:
self._metadata[SlurmdbdToken.AuthInfo] = value
@auth_info.deleter
def auth_info(self) -> None:
del self._metadata[SlurmdbdToken.AuthInfo]
@property
def auth_alt_types(self) -> Union[List[str], None]:
return (
None
if self._metadata.get(SlurmdbdToken.AuthAltTypes, None) is None
else self._metadata.get(SlurmdbdToken.AuthAltTypes).split(",")
)
@auth_alt_types.setter
def auth_alt_types(self, value: List[str]) -> None:
self._metadata[SlurmdbdToken.AuthAltTypes] = ",".join(value)
@auth_alt_types.deleter
def auth_alt_types(self) -> None:
del self._metadata[SlurmdbdToken.AuthAltTypes]
@property
def auth_alt_parameters(self) -> Union[List[str], None]:
return (
None
if self._metadata.get(SlurmdbdToken.AuthAltParameters, None) is None
else self._metadata.get(SlurmdbdToken.AuthAltParameters).split(",")
)
@auth_alt_parameters.setter
def auth_alt_parameters(self, value: List[Tuple[str, str]]) -> None:
self._metadata[SlurmdbdToken.AuthAltParameters] = ",".join(
f"{i[0]}={i[1]}" for i in value
)
@auth_alt_parameters.deleter
def auth_alt_parameters(self) -> None:
del self._metadata[SlurmdbdToken.AuthAltParameters]
@property
def auth_type(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.AuthType, None)
@auth_type.setter
def auth_type(self, value: str) -> None:
LintRules.check_auth_type(value)
self._metadata[SlurmdbdToken.AuthType] = value
@auth_type.deleter
def auth_type(self) -> None:
del self._metadata[SlurmdbdToken.AuthType]
@property
def commit_delay(self) -> Union[int, None]:
return (
None
if self._metadata.get(SlurmdbdToken.CommitDelay, None) is None
else int(self._metadata.get(SlurmdbdToken.CommitDelay))
)
@commit_delay.setter
def commit_delay(self, value: int) -> None:
self._metadata[SlurmdbdToken.CommitDelay] = value
@commit_delay.deleter
def commit_delay(self) -> None:
del self._metadata[SlurmdbdToken.CommitDelay]
@property
def communication_parameters(self) -> None:
return (
None
if self._metadata.get(SlurmdbdToken.CommunicationParameters, None) is None
else self._metadata.get(SlurmdbdToken.CommunicationParameters).split(",")
)
@communication_parameters.setter
def communication_parameters(self, value: List[str]) -> None:
self._metadata[SlurmdbdToken.CommunicationParameters] = ",".join(value)
@communication_parameters.deleter
def communication_parameters(self) -> None:
del self._metadata[SlurmdbdToken.CommunicationParameters]
@property
def dbd_backup_host(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.DbdBackupHost, None)
@dbd_backup_host.setter
def dbd_backup_host(self, value: str) -> None:
self._metadata[SlurmdbdToken.DbdBackupHost] = value
@dbd_backup_host.deleter
def dbd_backup_host(self) -> None:
del self._metadata[SlurmdbdToken.DbdBackupHost]
@property
def dbd_addr(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.DbdAddr, None)
@dbd_addr.setter
def dbd_addr(self, value: str) -> None:
self._metadata[SlurmdbdToken.DbdAddr] = value
@dbd_addr.deleter
def dbd_addr(self) -> None:
del self._metadata[SlurmdbdToken.DbdAddr]
@property
def dbd_host(self) -> None:
return self._metadata.get(SlurmdbdToken.DbdHost, None)
@dbd_host.setter
def dbd_host(self, value: str) -> None:
self._metadata[SlurmdbdToken.DbdHost] = value
@dbd_host.deleter
def dbd_host(self) -> None:
del self._metadata[SlurmdbdToken.DbdHost]
@property
def dbd_port(self) -> Union[int, None]:
return (
None
if self._metadata.get(SlurmdbdToken.DbdPort, None) is None
else int(self._metadata.get(SlurmdbdToken.DbdPort))
)
@dbd_port.setter
def dbd_port(self, value: int) -> None:
LintRules.check_port(value)
self._metadata[SlurmdbdToken.DbdPort] = value
@dbd_port.deleter
def dbd_port(self) -> None:
del self._metadata[SlurmdbdToken.DbdPort]
@property
def debug_flags(self) -> Union[List[str], None]:
return (
None
if self._metadata.get(SlurmdbdToken.DebugFlags, None) is None
else self._metadata.get(SlurmdbdToken.DebugFlags).split(",")
)
@debug_flags.setter
def debug_flags(self, value: List[str]) -> None:
for flag in value:
LintRules.check_debug_flag(flag)
self._metadata[SlurmdbdToken.DebugFlags] = ",".join(value)
@debug_flags.deleter
def debug_flags(self) -> None:
del self._metadata[SlurmdbdToken.DebugFlags]
@property
def debug_level(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.DebugLevel, None)
@debug_level.setter
def debug_level(self, value: str) -> None:
LintRules.check_debug_level(value)
self._metadata[SlurmdbdToken.DebugLevel] = value
@debug_level.deleter
def debug_level(self) -> None:
del self._metadata[SlurmdbdToken.DebugLevel]
@property
def debug_level_syslog(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.DebugLevelSyslog, None)
@debug_level_syslog.setter
def debug_level_syslog(self, value: str) -> None:
LintRules.check_debug_level(value)
self._metadata[SlurmdbdToken.DebugLevelSyslog] = value
@debug_level_syslog.deleter
def debug_level_syslog(self) -> None:
del self._metadata[SlurmdbdToken.DebugLevelSyslog]
@property
def default_qos(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.DefaultQOS, None)
@default_qos.setter
def default_qos(self, value: str) -> None:
self._metadata[SlurmdbdToken.DefaultQOS] = value
@default_qos.deleter
def default_qos(self) -> None:
del self._metadata[SlurmdbdToken.DefaultQOS]
@property
def log_file(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.LogFile, None)
@log_file.setter
def log_file(self, value: str) -> None:
self._metadata[SlurmdbdToken.LogFile] = value
@log_file.deleter
def log_file(self) -> None:
del self._metadata[SlurmdbdToken.LogFile]
@property
def log_time_format(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.LogTimeFormat, None)
@log_time_format.setter
def log_time_format(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.LogTimeFormat] = value
@log_time_format.deleter
def log_time_format(self) -> None:
del self._metadata[SlurmdbdToken.LogTimeFormat]
@property
def max_query_time_range(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.MaxQueryTimeRange, None)
@max_query_time_range.setter
def max_query_time_range(self, value: str) -> None:
LintRules.check_query_time_format(value)
self._metadata[SlurmdbdToken.MaxQueryTimeRange] = value
@max_query_time_range.deleter
def max_query_time_range(self) -> None:
del self._metadata[SlurmdbdToken.MaxQueryTimeRange]
@property
def message_timeout(self) -> Union[int, None]:
return (
None
if self._metadata.get(SlurmdbdToken.MessageTimeout, None) is None
else int(self._metadata.get(SlurmdbdToken.MessageTimeout))
)
@message_timeout.setter
def message_timeout(self, value: int) -> None:
self._metadata[SlurmdbdToken.MessageTimeout] = value
@message_timeout.deleter
def message_timeout(self) -> None:
del self._metadata[SlurmdbdToken.MessageTimeout]
@property
def parameters(self) -> Union[List[str], None]:
return (
None
if self._metadata.get(SlurmdbdToken.Parameters, None) is None
else self._metadata.get(SlurmdbdToken.Parameters).split(",")
)
@parameters.setter
def parameters(self, value: List[str]) -> None:
self._metadata[SlurmdbdToken.Parameters] = ",".join(value)
@parameters.deleter
def parameters(self) -> None:
del self._metadata[SlurmdbdToken.Parameters]
@property
def pid_file(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PidFile, None)
@pid_file.setter
def pid_file(self, value: str) -> None:
self._metadata[SlurmdbdToken.PidFile] = value
@pid_file.deleter
def pid_file(self) -> None:
del self._metadata[SlurmdbdToken.PidFile]
@property
def plugin_dir(self) -> Union[List[str], None]:
return (
None
if self._metadata.get(SlurmdbdToken.PluginDir, None) is None
else self._metadata.get(SlurmdbdToken.PluginDir).split(":")
)
@plugin_dir.setter
def plugin_dir(self, value: List[str]) -> None:
self._metadata[SlurmdbdToken.PluginDir] = ":".join(value)
@plugin_dir.deleter
def plugin_dir(self) -> None:
del self._metadata[SlurmdbdToken.PluginDir]
@property
def private_data(self) -> Union[List[str], None]:
return (
None
if self._metadata.get(SlurmdbdToken.PrivateData, None) is None
else self._metadata.get(SlurmdbdToken.PrivateData).split(",")
)
@private_data.setter
def private_data(self, value: List[str]) -> None:
for data in value:
LintRules.check_private_data(data)
self._metadata[SlurmdbdToken.PrivateData] = ",".join(value)
@private_data.deleter
def private_data(self) -> None:
del self._metadata[SlurmdbdToken.PrivateData]
@property
def purge_event_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeEventAfter, None)
@purge_event_after.setter
def purge_event_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeEventAfter] = value
@purge_event_after.deleter
def purge_event_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeEventAfter]
@property
def purge_job_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeJobAfter, None)
@purge_job_after.setter
def purge_job_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeJobAfter] = value
@purge_job_after.deleter
def purge_job_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeJobAfter]
@property
def purge_resv_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeResvAfter, None)
@purge_resv_after.setter
def purge_resv_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeResvAfter] = value
@purge_resv_after.deleter
def purge_resv_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeResvAfter]
@property
def purge_step_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeStepAfter, None)
@purge_step_after.setter
def purge_step_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeStepAfter] = value
@purge_step_after.deleter
def purge_step_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeStepAfter]
@property
def purge_suspend_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeSuspendAfter, None)
@purge_suspend_after.setter
def purge_suspend_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeSuspendAfter] = value
@purge_suspend_after.deleter
def purge_suspend_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeSuspendAfter]
@property
def purge_txn_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeTXNAfter, None)
@purge_txn_after.setter
def purge_txn_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeTXNAfter] = value
@purge_txn_after.deleter
def purge_txn_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeTXNAfter]
@property
def purge_usage_after(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.PurgeUsageAfter, None)
@purge_usage_after.setter
def purge_usage_after(self, value: str) -> None:
LintRules.check_time_format(value)
self._metadata[SlurmdbdToken.PurgeUsageAfter] = value
@purge_usage_after.deleter
def purge_usage_after(self) -> None:
del self._metadata[SlurmdbdToken.PurgeUsageAfter]
@property
def slurm_user(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.SlurmUser, None)
@slurm_user.setter
def slurm_user(self, value: str) -> None:
self._metadata[SlurmdbdToken.SlurmUser] = value
@slurm_user.deleter
def slurm_user(self) -> None:
del self._metadata[SlurmdbdToken.SlurmUser]
@property
def storage_host(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.StorageHost, None)
@storage_host.setter
def storage_host(self, value: str) -> None:
self._metadata[SlurmdbdToken.StorageHost] = value
@storage_host.deleter
def storage_host(self) -> None:
del self._metadata[SlurmdbdToken.StorageHost]
@property
def storage_backup_host(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.StorageBackupHost, None)
@storage_backup_host.setter
def storage_backup_host(self, value: str) -> None:
self._metadata[SlurmdbdToken.StorageBackupHost] = value
@storage_backup_host.deleter
def storage_backup_host(self) -> None:
del self._metadata[SlurmdbdToken.StorageBackupHost]
@property
def storage_loc(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.StorageLoc, None)
@storage_loc.setter
def storage_loc(self, value: str) -> None:
self._metadata[SlurmdbdToken.StorageLoc] = value
@storage_loc.deleter
def storage_loc(self) -> None:
del self._metadata[SlurmdbdToken.StorageLoc]
@property
def storage_parameters(self) -> None:
return self._metadata[SlurmdbdToken.StorageParameters]
@storage_parameters.setter
def storage_parameters(self, value: List[Tuple[str, str]]) -> None:
self._metadata[SlurmdbdToken.StorageParameters] = ",".join(
f"{i[0]}={i[1]}" for i in value
)
@storage_parameters.deleter
def storage_parameters(self) -> None:
del self._metadata[SlurmdbdToken.StorageParameters]
@property
def storage_pass(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.StoragePass, None)
@storage_pass.setter
def storage_pass(self, value: str) -> None:
LintRules.check_password(value)
self._metadata[SlurmdbdToken.StoragePass] = value
@storage_pass.deleter
def storage_pass(self) -> None:
del self._metadata[SlurmdbdToken.StoragePass]
@property
def storage_port(self) -> Union[int, None]:
return (
None
if self._metadata.get(SlurmdbdToken.StoragePort, None) is None
else int(self._metadata.get(SlurmdbdToken.StoragePort))
)
@storage_port.setter
def storage_port(self, value: int) -> None:
LintRules.check_port(value)
self._metadata[SlurmdbdToken.StoragePort] = value
@storage_port.deleter
def storage_port(self) -> None:
del self._metadata[SlurmdbdToken.StoragePort]
@property
def storage_type(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.StorageType, None)
@storage_type.setter
def storage_type(self, value: str) -> None:
LintRules.check_storage_type(value)
self._metadata[SlurmdbdToken.StorageType] = value
@storage_type.deleter
def storage_type(self) -> None:
del self._metadata[SlurmdbdToken.StorageType]
@property
def storage_user(self) -> Union[str, None]:
return self._metadata.get(SlurmdbdToken.StorageUser, None)
@storage_user.setter
def storage_user(self, value: str) -> None:
self._metadata[SlurmdbdToken.StorageUser] = value
@storage_user.deleter
def storage_user(self) -> None:
del self._metadata[SlurmdbdToken.StorageUser]
@property
def tcp_timeout(self) -> Union[int, None]:
return (
None
if self._metadata.get(SlurmdbdToken.TCPTimeout, None) is None
else int(self._metadata.get(SlurmdbdToken.TCPTimeout))
)
@tcp_timeout.setter
def tcp_timeout(self, value: int) -> None:
self._metadata[SlurmdbdToken.TCPTimeout] = value
@tcp_timeout.deleter
def tcp_timeout(self) -> None:
del self._metadata[SlurmdbdToken.TCPTimeout]
@property
def track_slurmctld_down(self) -> None:
return (
True
if self._metadata.get(SlurmdbdToken.TrackSlurmctldDown, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.TrackSlurmctldDown, None) == "no"
else None
)
@track_slurmctld_down.setter
def track_slurmctld_down(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.TrackSlurmctldDown] = (
"yes" if value is True else "no" if value is False else value
)
@track_slurmctld_down.deleter
def track_slurmctld_down(self) -> None:
del self._metadata[SlurmdbdToken.TrackSlurmctldDown]
@property
def track_wc_key(self) -> None:
return (
True
if self._metadata.get(SlurmdbdToken.TrackWCKey, None) == "yes"
else False
if self._metadata.get(SlurmdbdToken.TrackWCKey, None) == "no"
else None
)
@track_wc_key.setter
def track_wc_key(self, value: Union[str, bool]) -> None:
LintRules.check_bool(value)
self._metadata[SlurmdbdToken.TrackWCKey] = (
"yes" if value is True else "no" if value is False else value
)
@track_wc_key.deleter
def track_wc_key(self) -> None:
del self._metadata[SlurmdbdToken.TrackWCKey]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment