Files
unitforge/backend/app/core/unit_file.py
William Valentin 860f60591c Fix contrast issues with text-muted and bg-dark classes
- Fixed Bootstrap bg-dark class to use better contrasting color
- Added comprehensive text-muted contrast fixes for various contexts
- Improved dark theme colors for better accessibility
- Fixed CSS inheritance issues for code elements in dark contexts
- All color choices meet WCAG AA contrast requirements
2025-09-14 14:58:35 -07:00

880 lines
28 KiB
Python

"""
Core systemd unit file parser, validator, and generator.
This module provides the fundamental functionality for working with systemd unit files,
including parsing, validation, and generation.
"""
import configparser
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
class UnitType(Enum):
SERVICE = "service"
TIMER = "timer"
SOCKET = "socket"
MOUNT = "mount"
TARGET = "target"
PATH = "path"
SLICE = "slice"
@dataclass
class ValidationError:
"""Represents a validation error in a unit file."""
section: str
key: Optional[str]
message: str
severity: str = "error" # error, warning, info
line_number: Optional[int] = None
@dataclass
class UnitFileInfo:
"""Metadata about a unit file."""
name: str
unit_type: UnitType
description: Optional[str] = None
documentation: List[str] = field(default_factory=list)
requires: List[str] = field(default_factory=list)
wants: List[str] = field(default_factory=list)
conflicts: List[str] = field(default_factory=list)
class SystemdUnitFile:
"""
Parser and validator for systemd unit files.
This class can parse existing unit files, validate their syntax and semantics,
and generate new unit files from structured data.
"""
# Common systemd unit file sections
COMMON_SECTIONS = {
"Unit",
"Install",
"Service",
"Timer",
"Socket",
"Mount",
"Target",
"Path",
"Slice",
}
# Valid keys for each section
SECTION_KEYS = {
"Unit": {
"Description",
"Documentation",
"Requires",
"Wants",
"After",
"Before",
"Conflicts",
"ConditionPathExists",
"ConditionFileNotEmpty",
"AssertPathExists",
"OnFailure",
"StopWhenUnneeded",
"RefuseManualStart",
"RefuseManualStop",
"AllowIsolate",
"DefaultDependencies",
"JobTimeoutSec",
"StartLimitInterval",
"StartLimitBurst",
"RebootArgument",
"ConditionArchitecture",
"ConditionVirtualization",
"ConditionHost",
"ConditionKernelCommandLine",
"ConditionSecurity",
"ConditionCapability",
"ConditionACPower",
"ConditionNeedsUpdate",
"ConditionFirstBoot",
"ConditionPathIsDirectory",
"ConditionPathIsSymbolicLink",
"ConditionPathIsMountPoint",
"ConditionPathIsReadWrite",
"ConditionDirectoryNotEmpty",
"ConditionFileIsExecutable",
"ConditionUser",
"ConditionGroup",
},
"Install": {"Alias", "WantedBy", "RequiredBy", "Also", "DefaultInstance"},
"Service": {
"Type",
"ExecStart",
"ExecStartPre",
"ExecStartPost",
"ExecReload",
"ExecStop",
"ExecStopPost",
"RestartSec",
"TimeoutStartSec",
"TimeoutStopSec",
"TimeoutSec",
"RuntimeMaxSec",
"WatchdogSec",
"Restart",
"SuccessExitStatus",
"RestartPreventExitStatus",
"RestartForceExitStatus",
"PermissionsStartOnly",
"RootDirectoryStartOnly",
"RemainAfterExit",
"GuessMainPID",
"PIDFile",
"BusName",
"BusPolicy",
"User",
"Group",
"SupplementaryGroups",
"WorkingDirectory",
"RootDirectory",
"Nice",
"OOMScoreAdjust",
"IOSchedulingClass",
"IOSchedulingPriority",
"CPUSchedulingPolicy",
"CPUSchedulingPriority",
"CPUSchedulingResetOnFork",
"CPUAffinity",
"UMask",
"Environment",
"EnvironmentFile",
"PassEnvironment",
"UnsetEnvironment",
"StandardInput",
"StandardOutput",
"StandardError",
"TTYPath",
"TTYReset",
"TTYVHangup",
"TTYVTDisallocate",
"SyslogIdentifier",
"SyslogFacility",
"SyslogLevel",
"SyslogLevelPrefix",
"TimerSlackNSec",
"LimitCPU",
"LimitFSIZE",
"LimitDATA",
"LimitSTACK",
"LimitCORE",
"LimitRSS",
"LimitNOFILE",
"LimitAS",
"LimitNPROC",
"LimitMEMLOCK",
"LimitLOCKS",
"LimitSIGPENDING",
"LimitMSGQUEUE",
"LimitNICE",
"LimitRTPRIO",
"LimitRTTIME",
"PAMName",
"CapabilityBoundingSet",
"AmbientCapabilities",
"SecureBits",
"ReadWritePaths",
"ReadOnlyPaths",
"InaccessiblePaths",
"PrivateTmp",
"PrivateDevices",
"PrivateNetwork",
"ProtectSystem",
"ProtectHome",
"MountFlags",
"UtmpIdentifier",
"UtmpMode",
"SELinuxContext",
"AppArmorProfile",
"SmackProcessLabel",
"IgnoreSIGPIPE",
"NoNewPrivileges",
"SystemCallFilter",
"SystemCallErrorNumber",
"SystemCallArchitectures",
"RestrictAddressFamilies",
"Personality",
"RuntimeDirectory",
"RuntimeDirectoryMode",
},
"Timer": {
"OnActiveSec",
"OnBootSec",
"OnStartupSec",
"OnUnitActiveSec",
"OnUnitInactiveSec",
"OnCalendar",
"AccuracySec",
"RandomizedDelaySec",
"Unit",
"Persistent",
"WakeSystem",
"RemainAfterElapse",
},
"Socket": {
"ListenStream",
"ListenDatagram",
"ListenSequentialPacket",
"ListenFIFO",
"ListenSpecial",
"ListenNetlink",
"ListenMessageQueue",
"ListenUSBFunction",
"SocketProtocol",
"BindIPv6Only",
"Backlog",
"BindToDevice",
"SocketUser",
"SocketGroup",
"SocketMode",
"DirectoryMode",
"Accept",
"Writable",
"MaxConnections",
"MaxConnectionsPerSource",
"KeepAlive",
"KeepAliveTimeSec",
"KeepAliveIntervalSec",
"KeepAliveProbes",
"NoDelay",
"Priority",
"DeferAcceptSec",
"ReceiveBuffer",
"SendBuffer",
"IPTOS",
"IPTTL",
"Mark",
"ReusePort",
"SmackLabel",
"SmackLabelIPIn",
"SmackLabelIPOut",
"SELinuxContextFromNet",
"PipeSize",
"MessageQueueMaxMessages",
"MessageQueueMessageSize",
"FreeBind",
"Transparent",
"Broadcast",
"PassCredentials",
"PassSecurity",
"TCPCongestion",
"ExecStartPre",
"ExecStartPost",
"ExecStopPre",
"ExecStopPost",
"TimeoutSec",
"Service",
"RemoveOnStop",
"Symlinks",
"FileDescriptorName",
"TriggerLimitIntervalSec",
"TriggerLimitBurst",
},
"Mount": {
"What",
"Where",
"Type",
"Options",
"SloppyOptions",
"LazyUnmount",
"ForceUnmount",
"DirectoryMode",
"TimeoutSec",
},
"Target": {},
"Path": {
"PathExists",
"PathExistsGlob",
"PathChanged",
"PathModified",
"DirectoryNotEmpty",
"Unit",
"MakeDirectory",
"DirectoryMode",
},
}
# Service types and their requirements
SERVICE_TYPES = {
"simple": {"required": ["ExecStart"], "conflicts": ["BusName", "Type=forking"]},
"exec": {"required": ["ExecStart"], "conflicts": ["BusName"]},
"forking": {"recommended": ["PIDFile"], "conflicts": ["BusName"]},
"oneshot": {"conflicts": ["Restart=always", "Restart=on-success"]},
"dbus": {"required": ["BusName"], "conflicts": []},
"notify": {"required": ["ExecStart"], "conflicts": ["BusName"]},
"idle": {"required": ["ExecStart"], "conflicts": ["BusName"]},
}
def __init__(self, content: Optional[str] = None, file_path: Optional[str] = None):
"""Initialize with either content string or file path."""
self.content = content or ""
self.file_path = file_path
self.config = configparser.ConfigParser(
interpolation=None, allow_no_value=True, delimiters=("=",)
)
self.config.optionxform = lambda optionstr: str(optionstr) # Preserve case
self._parse_errors = []
if content:
self._parse_content(content)
elif file_path:
self._parse_file(file_path)
def _parse_content(self, content: str) -> None:
"""Parse unit file content from string."""
try:
self.config.read_string(content)
self.content = content
except configparser.Error as e:
self._parse_errors.append(
ValidationError("", None, f"Parse error: {str(e)}", "error")
)
def _parse_file(self, file_path: str) -> None:
"""Parse unit file from file path."""
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
self._parse_content(content)
self.file_path = file_path
except IOError as e:
self._parse_errors.append(
ValidationError("", None, f"File read error: {str(e)}", "error")
)
def get_unit_type(self) -> Optional[UnitType]:
"""Determine the unit type based on sections present."""
if self.config.has_section("Service"):
return UnitType.SERVICE
elif self.config.has_section("Timer"):
return UnitType.TIMER
elif self.config.has_section("Socket"):
return UnitType.SOCKET
elif self.config.has_section("Mount"):
return UnitType.MOUNT
elif self.config.has_section("Target"):
return UnitType.TARGET
elif self.config.has_section("Path"):
return UnitType.PATH
return None
def get_info(self) -> UnitFileInfo:
"""Extract basic information about the unit file."""
unit_type = self.get_unit_type()
name = ""
if self.file_path:
name = self.file_path.split("/")[-1]
description = None
if self.config.has_section("Unit") and self.config.has_option(
"Unit", "Description"
):
description = self.config.get("Unit", "Description")
documentation = []
if self.config.has_section("Unit") and self.config.has_option(
"Unit", "Documentation"
):
doc_value = self.config.get("Unit", "Documentation")
documentation = [doc.strip() for doc in doc_value.split()]
# Parse dependencies
requires = self._get_dependency_list("Requires")
wants = self._get_dependency_list("Wants")
conflicts = self._get_dependency_list("Conflicts")
if unit_type is None:
raise ValueError("Could not determine unit type")
return UnitFileInfo(
name=name,
unit_type=unit_type,
description=description,
documentation=documentation,
requires=requires,
wants=wants,
conflicts=conflicts,
)
def _get_dependency_list(self, key: str) -> List[str]:
"""Extract a space-separated dependency list."""
if not self.config.has_section("Unit") or not self.config.has_option(
"Unit", key
):
return []
value = self.config.get("Unit", key)
return [dep.strip() for dep in value.split() if dep.strip()]
def validate(self) -> List[ValidationError]:
"""Validate the unit file and return list of errors/warnings."""
errors = self._parse_errors.copy()
# Check for basic structure
if not self.config.sections():
errors.append(
ValidationError(
"", None, "Unit file is empty or has no sections", "error"
)
)
return errors
# Validate sections
for section in self.config.sections():
errors.extend(self._validate_section(section))
# Type-specific validation
unit_type = self.get_unit_type()
if unit_type:
errors.extend(self._validate_unit_type(unit_type))
return errors
def _validate_section(self, section: str) -> List[ValidationError]:
"""Validate a specific section."""
errors = []
# Check if section is known
if section not in self.COMMON_SECTIONS:
errors.append(
ValidationError(
section, None, f"Unknown section '{section}'", "warning"
)
)
return errors
# Validate keys in section
valid_keys = self.SECTION_KEYS.get(section, set())
for key in self.config.options(section):
if valid_keys and key not in valid_keys:
errors.append(
ValidationError(
section,
key,
f"Unknown key '{key}' in section '{section}'",
"warning",
)
)
# Validate key values
value = self.config.get(section, key)
errors.extend(self._validate_key_value(section, key, value))
return errors
def _validate_key_value(
self, section: str, key: str, value: str
) -> List[ValidationError]:
"""Validate specific key-value pairs."""
errors = []
# Service-specific validations
if section == "Service":
if key == "Type" and value not in self.SERVICE_TYPES:
errors.append(
ValidationError(
section, key, f"Invalid service type '{value}'", "error"
)
)
elif key == "Restart" and value not in [
"no",
"always",
"on-success",
"on-failure",
"on-abnormal",
"on-abort",
"on-watchdog",
]:
errors.append(
ValidationError(
section, key, f"Invalid restart value '{value}'", "error"
)
)
elif key.startswith("Exec") and not value:
errors.append(
ValidationError(
section, key, f"Empty execution command for '{key}'", "error"
)
)
elif key in ["TimeoutStartSec", "TimeoutStopSec", "RestartSec"] and value:
if not self._is_valid_time_span(value):
errors.append(
ValidationError(
section, key, f"Invalid time span '{value}'", "error"
)
)
# Timer-specific validations
elif section == "Timer":
if key.startswith("On") and key.endswith("Sec") and value:
if not self._is_valid_time_span(value):
errors.append(
ValidationError(
section, key, f"Invalid time span '{value}'", "error"
)
)
elif key == "OnCalendar" and value:
# Basic calendar validation - could be more comprehensive
if not re.match(r"^[\w\s:*/-]+$", value):
errors.append(
ValidationError(
section,
key,
f"Invalid calendar specification '{value}'",
"warning",
)
)
# Socket-specific validations
elif section == "Socket":
if key.startswith("Listen") and not value:
errors.append(
ValidationError(
section, key, f"Empty listen specification for '{key}'", "error"
)
)
# Mount-specific validations
elif section == "Mount":
if key == "What" and not value:
errors.append(
ValidationError(
section, key, "Mount source (What) cannot be empty", "error"
)
)
elif key == "Where" and not value:
errors.append(
ValidationError(
section, key, "Mount point (Where) cannot be empty", "error"
)
)
elif key == "Where" and not value.startswith("/"):
errors.append(
ValidationError(
section, key, "Mount point must be an absolute path", "error"
)
)
return errors
def _validate_unit_type(self, unit_type: UnitType) -> List[ValidationError]:
"""Perform type-specific validation."""
errors = []
if unit_type == UnitType.SERVICE:
errors.extend(self._validate_service())
elif unit_type == UnitType.TIMER:
errors.extend(self._validate_timer())
elif unit_type == UnitType.SOCKET:
errors.extend(self._validate_socket())
elif unit_type == UnitType.MOUNT:
errors.extend(self._validate_mount())
return errors
def _validate_service(self) -> List[ValidationError]:
"""Validate service-specific requirements."""
errors = []
if not self.config.has_section("Service"):
return errors
service_type = self.config.get("Service", "Type", fallback="simple")
type_config = self.SERVICE_TYPES.get(service_type, {})
# Check required keys
for required_key in type_config.get("required", []):
if not self.config.has_option("Service", required_key):
errors.append(
ValidationError(
"Service",
required_key,
(
f"Required key '{required_key}' missing for "
f"service type '{service_type}'"
),
"error",
)
)
# Check for conflicting configurations
for conflict in type_config.get("conflicts", []):
if "=" in conflict:
key, value = conflict.split("=", 1)
if self.config.has_option("Service", key):
actual_value = self.config.get("Service", key)
if actual_value == value:
errors.append(
ValidationError(
"Service",
key,
(
f"'{key}={value}' conflicts with "
f"service type '{service_type}'"
),
"error",
)
)
else:
if self.config.has_option("Service", conflict):
errors.append(
ValidationError(
"Service",
conflict,
(
f"'{conflict}' conflicts with "
f"service type '{service_type}'"
),
"error",
)
)
return errors
def _validate_timer(self) -> List[ValidationError]:
"""Validate timer-specific requirements."""
errors = []
if not self.config.has_section("Timer"):
return errors
# At least one timer specification is required
timer_keys = [
"OnActiveSec",
"OnBootSec",
"OnStartupSec",
"OnUnitActiveSec",
"OnUnitInactiveSec",
"OnCalendar",
]
has_timer = any(self.config.has_option("Timer", key) for key in timer_keys)
if not has_timer:
errors.append(
ValidationError(
"Timer",
None,
"Timer must have at least one timing specification",
"error",
)
)
return errors
def _validate_socket(self) -> List[ValidationError]:
"""Validate socket-specific requirements."""
errors = []
if not self.config.has_section("Socket"):
return errors
# At least one listen specification is required
listen_keys = [
"ListenStream",
"ListenDatagram",
"ListenSequentialPacket",
"ListenFIFO",
"ListenSpecial",
"ListenNetlink",
"ListenMessageQueue",
]
has_listen = any(self.config.has_option("Socket", key) for key in listen_keys)
if not has_listen:
errors.append(
ValidationError(
"Socket",
None,
"Socket must have at least one Listen specification",
"error",
)
)
return errors
def _validate_mount(self) -> List[ValidationError]:
"""Validate mount-specific requirements."""
errors = []
if not self.config.has_section("Mount"):
return errors
# What and Where are required
if not self.config.has_option("Mount", "What"):
errors.append(
ValidationError(
"Mount",
"What",
"Mount units require 'What' (source) specification",
"error",
)
)
if not self.config.has_option("Mount", "Where"):
errors.append(
ValidationError(
"Mount",
"Where",
"Mount units require 'Where' (mount point) specification",
"error",
)
)
return errors
def _is_valid_time_span(self, value: str) -> bool:
"""Validate systemd time span format."""
# Basic validation for time spans like "30s", "5min", "1h", "infinity"
if value.lower() in ["infinity", "0"]:
return True
pattern = r"^\d+(\.\d+)?(us|ms|s|min|h|d|w|month|y)?$"
return bool(re.match(pattern, value.lower()))
def to_string(self) -> str:
"""Convert the unit file back to string format."""
if not self.config.sections():
return ""
lines = []
for section in self.config.sections():
lines.append(f"[{section}]")
for key in self.config.options(section):
value = self.config.get(section, key)
if value is None:
lines.append(key)
else:
lines.append(f"{key}={value}")
lines.append("") # Empty line between sections
return "\n".join(lines).rstrip()
def set_value(self, section: str, key: str, value: str) -> None:
"""Set a value in the unit file."""
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, key, value)
def get_value(
self, section: str, key: str, fallback: Optional[str] = None
) -> Optional[str]:
"""Get a value from the unit file."""
if not self.config.has_section(section):
return fallback
return self.config.get(section, key, fallback=fallback)
def remove_key(self, section: str, key: str) -> bool:
"""Remove a key from the unit file."""
if self.config.has_section(section) and self.config.has_option(section, key):
self.config.remove_option(section, key)
return True
return False
def get_sections(self) -> List[str]:
"""Get all sections in the unit file."""
return self.config.sections()
def get_keys(self, section: str) -> List[str]:
"""Get all keys in a section."""
if not self.config.has_section(section):
return []
return self.config.options(section)
def create_unit_file(unit_type: UnitType, **kwargs) -> SystemdUnitFile:
"""
Create a new unit file of the specified type with basic structure.
Args:
unit_type: The type of unit file to create
**kwargs: Additional configuration parameters
Returns:
SystemdUnitFile instance with basic structure
"""
unit = SystemdUnitFile()
# Add Unit section
unit.set_value(
"Unit",
"Description",
kwargs.get("description", f"Generated {unit_type.value} unit"),
)
if kwargs.get("documentation"):
unit.set_value("Unit", "Documentation", kwargs["documentation"])
if kwargs.get("requires"):
unit.set_value("Unit", "Requires", " ".join(kwargs["requires"]))
if kwargs.get("wants"):
unit.set_value("Unit", "Wants", " ".join(kwargs["wants"]))
if kwargs.get("after"):
unit.set_value("Unit", "After", " ".join(kwargs["after"]))
# Add type-specific sections
if unit_type == UnitType.SERVICE:
unit.set_value("Service", "Type", kwargs.get("service_type", "simple"))
if kwargs.get("exec_start"):
unit.set_value("Service", "ExecStart", kwargs["exec_start"])
if kwargs.get("user"):
unit.set_value("Service", "User", kwargs["user"])
if kwargs.get("group"):
unit.set_value("Service", "Group", kwargs["group"])
if kwargs.get("working_directory"):
unit.set_value("Service", "WorkingDirectory", kwargs["working_directory"])
unit.set_value("Service", "Restart", kwargs.get("restart", "on-failure"))
elif unit_type == UnitType.TIMER:
if kwargs.get("on_calendar"):
unit.set_value("Timer", "OnCalendar", kwargs["on_calendar"])
if kwargs.get("on_boot_sec"):
unit.set_value("Timer", "OnBootSec", kwargs["on_boot_sec"])
unit.set_value(
"Timer", "Persistent", str(kwargs.get("persistent", "true")).lower()
)
elif unit_type == UnitType.SOCKET:
if kwargs.get("listen_stream"):
unit.set_value("Socket", "ListenStream", kwargs["listen_stream"])
if kwargs.get("listen_datagram"):
unit.set_value("Socket", "ListenDatagram", kwargs["listen_datagram"])
elif unit_type == UnitType.MOUNT:
if kwargs.get("what"):
unit.set_value("Mount", "What", kwargs["what"])
if kwargs.get("where"):
unit.set_value("Mount", "Where", kwargs["where"])
if kwargs.get("type"):
unit.set_value("Mount", "Type", kwargs["type"])
if kwargs.get("options"):
unit.set_value("Mount", "Options", kwargs["options"])
# Add Install section if specified
if kwargs.get("wanted_by"):
wanted_by_value = kwargs["wanted_by"]
if isinstance(wanted_by_value, (list, tuple)):
unit.set_value("Install", "WantedBy", " ".join(wanted_by_value))
else:
unit.set_value("Install", "WantedBy", str(wanted_by_value))
elif unit_type in [UnitType.SERVICE, UnitType.TIMER]:
unit.set_value("Install", "WantedBy", "multi-user.target")
return unit