""" Core systemd unit file parser, validator, and generator. This module provides the fundamental functionality for working with systemd unit files, including parsing, validation, and generation. """ import configparser import re from dataclasses import dataclass, field from enum import Enum from typing import List, Optional class UnitType(Enum): SERVICE = "service" TIMER = "timer" SOCKET = "socket" MOUNT = "mount" TARGET = "target" PATH = "path" SLICE = "slice" @dataclass class ValidationError: """Represents a validation error in a unit file.""" section: str key: Optional[str] message: str severity: str = "error" # error, warning, info line_number: Optional[int] = None @dataclass class UnitFileInfo: """Metadata about a unit file.""" name: str unit_type: UnitType description: Optional[str] = None documentation: List[str] = field(default_factory=list) requires: List[str] = field(default_factory=list) wants: List[str] = field(default_factory=list) conflicts: List[str] = field(default_factory=list) class SystemdUnitFile: """ Parser and validator for systemd unit files. This class can parse existing unit files, validate their syntax and semantics, and generate new unit files from structured data. """ # Common systemd unit file sections COMMON_SECTIONS = { "Unit", "Install", "Service", "Timer", "Socket", "Mount", "Target", "Path", "Slice", } # Valid keys for each section SECTION_KEYS = { "Unit": { "Description", "Documentation", "Requires", "Wants", "After", "Before", "Conflicts", "ConditionPathExists", "ConditionFileNotEmpty", "AssertPathExists", "OnFailure", "StopWhenUnneeded", "RefuseManualStart", "RefuseManualStop", "AllowIsolate", "DefaultDependencies", "JobTimeoutSec", "StartLimitInterval", "StartLimitBurst", "RebootArgument", "ConditionArchitecture", "ConditionVirtualization", "ConditionHost", "ConditionKernelCommandLine", "ConditionSecurity", "ConditionCapability", "ConditionACPower", "ConditionNeedsUpdate", "ConditionFirstBoot", "ConditionPathIsDirectory", "ConditionPathIsSymbolicLink", "ConditionPathIsMountPoint", "ConditionPathIsReadWrite", "ConditionDirectoryNotEmpty", "ConditionFileIsExecutable", "ConditionUser", "ConditionGroup", }, "Install": {"Alias", "WantedBy", "RequiredBy", "Also", "DefaultInstance"}, "Service": { "Type", "ExecStart", "ExecStartPre", "ExecStartPost", "ExecReload", "ExecStop", "ExecStopPost", "RestartSec", "TimeoutStartSec", "TimeoutStopSec", "TimeoutSec", "RuntimeMaxSec", "WatchdogSec", "Restart", "SuccessExitStatus", "RestartPreventExitStatus", "RestartForceExitStatus", "PermissionsStartOnly", "RootDirectoryStartOnly", "RemainAfterExit", "GuessMainPID", "PIDFile", "BusName", "BusPolicy", "User", "Group", "SupplementaryGroups", "WorkingDirectory", "RootDirectory", "Nice", "OOMScoreAdjust", "IOSchedulingClass", "IOSchedulingPriority", "CPUSchedulingPolicy", "CPUSchedulingPriority", "CPUSchedulingResetOnFork", "CPUAffinity", "UMask", "Environment", "EnvironmentFile", "PassEnvironment", "UnsetEnvironment", "StandardInput", "StandardOutput", "StandardError", "TTYPath", "TTYReset", "TTYVHangup", "TTYVTDisallocate", "SyslogIdentifier", "SyslogFacility", "SyslogLevel", "SyslogLevelPrefix", "TimerSlackNSec", "LimitCPU", "LimitFSIZE", "LimitDATA", "LimitSTACK", "LimitCORE", "LimitRSS", "LimitNOFILE", "LimitAS", "LimitNPROC", "LimitMEMLOCK", "LimitLOCKS", "LimitSIGPENDING", "LimitMSGQUEUE", "LimitNICE", "LimitRTPRIO", "LimitRTTIME", "PAMName", "CapabilityBoundingSet", "AmbientCapabilities", "SecureBits", "ReadWritePaths", "ReadOnlyPaths", "InaccessiblePaths", "PrivateTmp", "PrivateDevices", "PrivateNetwork", "ProtectSystem", "ProtectHome", "MountFlags", "UtmpIdentifier", "UtmpMode", "SELinuxContext", "AppArmorProfile", "SmackProcessLabel", "IgnoreSIGPIPE", "NoNewPrivileges", "SystemCallFilter", "SystemCallErrorNumber", "SystemCallArchitectures", "RestrictAddressFamilies", "Personality", "RuntimeDirectory", "RuntimeDirectoryMode", }, "Timer": { "OnActiveSec", "OnBootSec", "OnStartupSec", "OnUnitActiveSec", "OnUnitInactiveSec", "OnCalendar", "AccuracySec", "RandomizedDelaySec", "Unit", "Persistent", "WakeSystem", "RemainAfterElapse", }, "Socket": { "ListenStream", "ListenDatagram", "ListenSequentialPacket", "ListenFIFO", "ListenSpecial", "ListenNetlink", "ListenMessageQueue", "ListenUSBFunction", "SocketProtocol", "BindIPv6Only", "Backlog", "BindToDevice", "SocketUser", "SocketGroup", "SocketMode", "DirectoryMode", "Accept", "Writable", "MaxConnections", "MaxConnectionsPerSource", "KeepAlive", "KeepAliveTimeSec", "KeepAliveIntervalSec", "KeepAliveProbes", "NoDelay", "Priority", "DeferAcceptSec", "ReceiveBuffer", "SendBuffer", "IPTOS", "IPTTL", "Mark", "ReusePort", "SmackLabel", "SmackLabelIPIn", "SmackLabelIPOut", "SELinuxContextFromNet", "PipeSize", "MessageQueueMaxMessages", "MessageQueueMessageSize", "FreeBind", "Transparent", "Broadcast", "PassCredentials", "PassSecurity", "TCPCongestion", "ExecStartPre", "ExecStartPost", "ExecStopPre", "ExecStopPost", "TimeoutSec", "Service", "RemoveOnStop", "Symlinks", "FileDescriptorName", "TriggerLimitIntervalSec", "TriggerLimitBurst", }, "Mount": { "What", "Where", "Type", "Options", "SloppyOptions", "LazyUnmount", "ForceUnmount", "DirectoryMode", "TimeoutSec", }, "Target": {}, "Path": { "PathExists", "PathExistsGlob", "PathChanged", "PathModified", "DirectoryNotEmpty", "Unit", "MakeDirectory", "DirectoryMode", }, } # Service types and their requirements SERVICE_TYPES = { "simple": {"required": ["ExecStart"], "conflicts": ["BusName", "Type=forking"]}, "exec": {"required": ["ExecStart"], "conflicts": ["BusName"]}, "forking": {"recommended": ["PIDFile"], "conflicts": ["BusName"]}, "oneshot": {"conflicts": ["Restart=always", "Restart=on-success"]}, "dbus": {"required": ["BusName"], "conflicts": []}, "notify": {"required": ["ExecStart"], "conflicts": ["BusName"]}, "idle": {"required": ["ExecStart"], "conflicts": ["BusName"]}, } def __init__(self, content: Optional[str] = None, file_path: Optional[str] = None): """Initialize with either content string or file path.""" self.content = content or "" self.file_path = file_path self.config = configparser.ConfigParser( interpolation=None, allow_no_value=True, delimiters=("=",) ) self.config.optionxform = lambda optionstr: str(optionstr) # Preserve case self._parse_errors = [] if content: self._parse_content(content) elif file_path: self._parse_file(file_path) def _parse_content(self, content: str) -> None: """Parse unit file content from string.""" try: self.config.read_string(content) self.content = content except configparser.Error as e: self._parse_errors.append( ValidationError("", None, f"Parse error: {str(e)}", "error") ) def _parse_file(self, file_path: str) -> None: """Parse unit file from file path.""" try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() self._parse_content(content) self.file_path = file_path except IOError as e: self._parse_errors.append( ValidationError("", None, f"File read error: {str(e)}", "error") ) def get_unit_type(self) -> Optional[UnitType]: """Determine the unit type based on sections present.""" if self.config.has_section("Service"): return UnitType.SERVICE elif self.config.has_section("Timer"): return UnitType.TIMER elif self.config.has_section("Socket"): return UnitType.SOCKET elif self.config.has_section("Mount"): return UnitType.MOUNT elif self.config.has_section("Target"): return UnitType.TARGET elif self.config.has_section("Path"): return UnitType.PATH return None def get_info(self) -> UnitFileInfo: """Extract basic information about the unit file.""" unit_type = self.get_unit_type() name = "" if self.file_path: name = self.file_path.split("/")[-1] description = None if self.config.has_section("Unit") and self.config.has_option( "Unit", "Description" ): description = self.config.get("Unit", "Description") documentation = [] if self.config.has_section("Unit") and self.config.has_option( "Unit", "Documentation" ): doc_value = self.config.get("Unit", "Documentation") documentation = [doc.strip() for doc in doc_value.split()] # Parse dependencies requires = self._get_dependency_list("Requires") wants = self._get_dependency_list("Wants") conflicts = self._get_dependency_list("Conflicts") if unit_type is None: raise ValueError("Could not determine unit type") return UnitFileInfo( name=name, unit_type=unit_type, description=description, documentation=documentation, requires=requires, wants=wants, conflicts=conflicts, ) def _get_dependency_list(self, key: str) -> List[str]: """Extract a space-separated dependency list.""" if not self.config.has_section("Unit") or not self.config.has_option( "Unit", key ): return [] value = self.config.get("Unit", key) return [dep.strip() for dep in value.split() if dep.strip()] def validate(self) -> List[ValidationError]: """Validate the unit file and return list of errors/warnings.""" errors = self._parse_errors.copy() # Check for basic structure if not self.config.sections(): errors.append( ValidationError( "", None, "Unit file is empty or has no sections", "error" ) ) return errors # Validate sections for section in self.config.sections(): errors.extend(self._validate_section(section)) # Type-specific validation unit_type = self.get_unit_type() if unit_type: errors.extend(self._validate_unit_type(unit_type)) return errors def _validate_section(self, section: str) -> List[ValidationError]: """Validate a specific section.""" errors = [] # Check if section is known if section not in self.COMMON_SECTIONS: errors.append( ValidationError( section, None, f"Unknown section '{section}'", "warning" ) ) return errors # Validate keys in section valid_keys = self.SECTION_KEYS.get(section, set()) for key in self.config.options(section): if valid_keys and key not in valid_keys: errors.append( ValidationError( section, key, f"Unknown key '{key}' in section '{section}'", "warning", ) ) # Validate key values value = self.config.get(section, key) errors.extend(self._validate_key_value(section, key, value)) return errors def _validate_key_value( self, section: str, key: str, value: str ) -> List[ValidationError]: """Validate specific key-value pairs.""" errors = [] # Service-specific validations if section == "Service": if key == "Type" and value not in self.SERVICE_TYPES: errors.append( ValidationError( section, key, f"Invalid service type '{value}'", "error" ) ) elif key == "Restart" and value not in [ "no", "always", "on-success", "on-failure", "on-abnormal", "on-abort", "on-watchdog", ]: errors.append( ValidationError( section, key, f"Invalid restart value '{value}'", "error" ) ) elif key.startswith("Exec") and not value: errors.append( ValidationError( section, key, f"Empty execution command for '{key}'", "error" ) ) elif key in ["TimeoutStartSec", "TimeoutStopSec", "RestartSec"] and value: if not self._is_valid_time_span(value): errors.append( ValidationError( section, key, f"Invalid time span '{value}'", "error" ) ) # Timer-specific validations elif section == "Timer": if key.startswith("On") and key.endswith("Sec") and value: if not self._is_valid_time_span(value): errors.append( ValidationError( section, key, f"Invalid time span '{value}'", "error" ) ) elif key == "OnCalendar" and value: # Basic calendar validation - could be more comprehensive if not re.match(r"^[\w\s:*/-]+$", value): errors.append( ValidationError( section, key, f"Invalid calendar specification '{value}'", "warning", ) ) # Socket-specific validations elif section == "Socket": if key.startswith("Listen") and not value: errors.append( ValidationError( section, key, f"Empty listen specification for '{key}'", "error" ) ) # Mount-specific validations elif section == "Mount": if key == "What" and not value: errors.append( ValidationError( section, key, "Mount source (What) cannot be empty", "error" ) ) elif key == "Where" and not value: errors.append( ValidationError( section, key, "Mount point (Where) cannot be empty", "error" ) ) elif key == "Where" and not value.startswith("/"): errors.append( ValidationError( section, key, "Mount point must be an absolute path", "error" ) ) return errors def _validate_unit_type(self, unit_type: UnitType) -> List[ValidationError]: """Perform type-specific validation.""" errors = [] if unit_type == UnitType.SERVICE: errors.extend(self._validate_service()) elif unit_type == UnitType.TIMER: errors.extend(self._validate_timer()) elif unit_type == UnitType.SOCKET: errors.extend(self._validate_socket()) elif unit_type == UnitType.MOUNT: errors.extend(self._validate_mount()) return errors def _validate_service(self) -> List[ValidationError]: """Validate service-specific requirements.""" errors = [] if not self.config.has_section("Service"): return errors service_type = self.config.get("Service", "Type", fallback="simple") type_config = self.SERVICE_TYPES.get(service_type, {}) # Check required keys for required_key in type_config.get("required", []): if not self.config.has_option("Service", required_key): errors.append( ValidationError( "Service", required_key, ( f"Required key '{required_key}' missing for " f"service type '{service_type}'" ), "error", ) ) # Check for conflicting configurations for conflict in type_config.get("conflicts", []): if "=" in conflict: key, value = conflict.split("=", 1) if self.config.has_option("Service", key): actual_value = self.config.get("Service", key) if actual_value == value: errors.append( ValidationError( "Service", key, ( f"'{key}={value}' conflicts with " f"service type '{service_type}'" ), "error", ) ) else: if self.config.has_option("Service", conflict): errors.append( ValidationError( "Service", conflict, ( f"'{conflict}' conflicts with " f"service type '{service_type}'" ), "error", ) ) return errors def _validate_timer(self) -> List[ValidationError]: """Validate timer-specific requirements.""" errors = [] if not self.config.has_section("Timer"): return errors # At least one timer specification is required timer_keys = [ "OnActiveSec", "OnBootSec", "OnStartupSec", "OnUnitActiveSec", "OnUnitInactiveSec", "OnCalendar", ] has_timer = any(self.config.has_option("Timer", key) for key in timer_keys) if not has_timer: errors.append( ValidationError( "Timer", None, "Timer must have at least one timing specification", "error", ) ) return errors def _validate_socket(self) -> List[ValidationError]: """Validate socket-specific requirements.""" errors = [] if not self.config.has_section("Socket"): return errors # At least one listen specification is required listen_keys = [ "ListenStream", "ListenDatagram", "ListenSequentialPacket", "ListenFIFO", "ListenSpecial", "ListenNetlink", "ListenMessageQueue", ] has_listen = any(self.config.has_option("Socket", key) for key in listen_keys) if not has_listen: errors.append( ValidationError( "Socket", None, "Socket must have at least one Listen specification", "error", ) ) return errors def _validate_mount(self) -> List[ValidationError]: """Validate mount-specific requirements.""" errors = [] if not self.config.has_section("Mount"): return errors # What and Where are required if not self.config.has_option("Mount", "What"): errors.append( ValidationError( "Mount", "What", "Mount units require 'What' (source) specification", "error", ) ) if not self.config.has_option("Mount", "Where"): errors.append( ValidationError( "Mount", "Where", "Mount units require 'Where' (mount point) specification", "error", ) ) return errors def _is_valid_time_span(self, value: str) -> bool: """Validate systemd time span format.""" # Basic validation for time spans like "30s", "5min", "1h", "infinity" if value.lower() in ["infinity", "0"]: return True pattern = r"^\d+(\.\d+)?(us|ms|s|min|h|d|w|month|y)?$" return bool(re.match(pattern, value.lower())) def to_string(self) -> str: """Convert the unit file back to string format.""" if not self.config.sections(): return "" lines = [] for section in self.config.sections(): lines.append(f"[{section}]") for key in self.config.options(section): value = self.config.get(section, key) if value is None: lines.append(key) else: lines.append(f"{key}={value}") lines.append("") # Empty line between sections return "\n".join(lines).rstrip() def set_value(self, section: str, key: str, value: str) -> None: """Set a value in the unit file.""" if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, key, value) def get_value( self, section: str, key: str, fallback: Optional[str] = None ) -> Optional[str]: """Get a value from the unit file.""" if not self.config.has_section(section): return fallback return self.config.get(section, key, fallback=fallback) def remove_key(self, section: str, key: str) -> bool: """Remove a key from the unit file.""" if self.config.has_section(section) and self.config.has_option(section, key): self.config.remove_option(section, key) return True return False def get_sections(self) -> List[str]: """Get all sections in the unit file.""" return self.config.sections() def get_keys(self, section: str) -> List[str]: """Get all keys in a section.""" if not self.config.has_section(section): return [] return self.config.options(section) def create_unit_file(unit_type: UnitType, **kwargs) -> SystemdUnitFile: """ Create a new unit file of the specified type with basic structure. Args: unit_type: The type of unit file to create **kwargs: Additional configuration parameters Returns: SystemdUnitFile instance with basic structure """ unit = SystemdUnitFile() # Add Unit section unit.set_value( "Unit", "Description", kwargs.get("description", f"Generated {unit_type.value} unit"), ) if kwargs.get("documentation"): unit.set_value("Unit", "Documentation", kwargs["documentation"]) if kwargs.get("requires"): unit.set_value("Unit", "Requires", " ".join(kwargs["requires"])) if kwargs.get("wants"): unit.set_value("Unit", "Wants", " ".join(kwargs["wants"])) if kwargs.get("after"): unit.set_value("Unit", "After", " ".join(kwargs["after"])) # Add type-specific sections if unit_type == UnitType.SERVICE: unit.set_value("Service", "Type", kwargs.get("service_type", "simple")) if kwargs.get("exec_start"): unit.set_value("Service", "ExecStart", kwargs["exec_start"]) if kwargs.get("user"): unit.set_value("Service", "User", kwargs["user"]) if kwargs.get("group"): unit.set_value("Service", "Group", kwargs["group"]) if kwargs.get("working_directory"): unit.set_value("Service", "WorkingDirectory", kwargs["working_directory"]) unit.set_value("Service", "Restart", kwargs.get("restart", "on-failure")) elif unit_type == UnitType.TIMER: if kwargs.get("on_calendar"): unit.set_value("Timer", "OnCalendar", kwargs["on_calendar"]) if kwargs.get("on_boot_sec"): unit.set_value("Timer", "OnBootSec", kwargs["on_boot_sec"]) unit.set_value( "Timer", "Persistent", str(kwargs.get("persistent", "true")).lower() ) elif unit_type == UnitType.SOCKET: if kwargs.get("listen_stream"): unit.set_value("Socket", "ListenStream", kwargs["listen_stream"]) if kwargs.get("listen_datagram"): unit.set_value("Socket", "ListenDatagram", kwargs["listen_datagram"]) elif unit_type == UnitType.MOUNT: if kwargs.get("what"): unit.set_value("Mount", "What", kwargs["what"]) if kwargs.get("where"): unit.set_value("Mount", "Where", kwargs["where"]) if kwargs.get("type"): unit.set_value("Mount", "Type", kwargs["type"]) if kwargs.get("options"): unit.set_value("Mount", "Options", kwargs["options"]) # Add Install section if specified if kwargs.get("wanted_by"): wanted_by_value = kwargs["wanted_by"] if isinstance(wanted_by_value, (list, tuple)): unit.set_value("Install", "WantedBy", " ".join(wanted_by_value)) else: unit.set_value("Install", "WantedBy", str(wanted_by_value)) elif unit_type in [UnitType.SERVICE, UnitType.TIMER]: unit.set_value("Install", "WantedBy", "multi-user.target") return unit