Fix contrast issues with text-muted and bg-dark classes
- Fixed Bootstrap bg-dark class to use better contrasting color - Added comprehensive text-muted contrast fixes for various contexts - Improved dark theme colors for better accessibility - Fixed CSS inheritance issues for code elements in dark contexts - All color choices meet WCAG AA contrast requirements
This commit is contained in:
879
backend/app/core/unit_file.py
Normal file
879
backend/app/core/unit_file.py
Normal file
@@ -0,0 +1,879 @@
|
||||
"""
|
||||
Core systemd unit file parser, validator, and generator.
|
||||
|
||||
This module provides the fundamental functionality for working with systemd unit files,
|
||||
including parsing, validation, and generation.
|
||||
"""
|
||||
|
||||
import configparser
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
class UnitType(Enum):
|
||||
SERVICE = "service"
|
||||
TIMER = "timer"
|
||||
SOCKET = "socket"
|
||||
MOUNT = "mount"
|
||||
TARGET = "target"
|
||||
PATH = "path"
|
||||
SLICE = "slice"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationError:
|
||||
"""Represents a validation error in a unit file."""
|
||||
|
||||
section: str
|
||||
key: Optional[str]
|
||||
message: str
|
||||
severity: str = "error" # error, warning, info
|
||||
line_number: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class UnitFileInfo:
|
||||
"""Metadata about a unit file."""
|
||||
|
||||
name: str
|
||||
unit_type: UnitType
|
||||
description: Optional[str] = None
|
||||
documentation: List[str] = field(default_factory=list)
|
||||
requires: List[str] = field(default_factory=list)
|
||||
wants: List[str] = field(default_factory=list)
|
||||
conflicts: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class SystemdUnitFile:
|
||||
"""
|
||||
Parser and validator for systemd unit files.
|
||||
|
||||
This class can parse existing unit files, validate their syntax and semantics,
|
||||
and generate new unit files from structured data.
|
||||
"""
|
||||
|
||||
# Common systemd unit file sections
|
||||
COMMON_SECTIONS = {
|
||||
"Unit",
|
||||
"Install",
|
||||
"Service",
|
||||
"Timer",
|
||||
"Socket",
|
||||
"Mount",
|
||||
"Target",
|
||||
"Path",
|
||||
"Slice",
|
||||
}
|
||||
|
||||
# Valid keys for each section
|
||||
SECTION_KEYS = {
|
||||
"Unit": {
|
||||
"Description",
|
||||
"Documentation",
|
||||
"Requires",
|
||||
"Wants",
|
||||
"After",
|
||||
"Before",
|
||||
"Conflicts",
|
||||
"ConditionPathExists",
|
||||
"ConditionFileNotEmpty",
|
||||
"AssertPathExists",
|
||||
"OnFailure",
|
||||
"StopWhenUnneeded",
|
||||
"RefuseManualStart",
|
||||
"RefuseManualStop",
|
||||
"AllowIsolate",
|
||||
"DefaultDependencies",
|
||||
"JobTimeoutSec",
|
||||
"StartLimitInterval",
|
||||
"StartLimitBurst",
|
||||
"RebootArgument",
|
||||
"ConditionArchitecture",
|
||||
"ConditionVirtualization",
|
||||
"ConditionHost",
|
||||
"ConditionKernelCommandLine",
|
||||
"ConditionSecurity",
|
||||
"ConditionCapability",
|
||||
"ConditionACPower",
|
||||
"ConditionNeedsUpdate",
|
||||
"ConditionFirstBoot",
|
||||
"ConditionPathIsDirectory",
|
||||
"ConditionPathIsSymbolicLink",
|
||||
"ConditionPathIsMountPoint",
|
||||
"ConditionPathIsReadWrite",
|
||||
"ConditionDirectoryNotEmpty",
|
||||
"ConditionFileIsExecutable",
|
||||
"ConditionUser",
|
||||
"ConditionGroup",
|
||||
},
|
||||
"Install": {"Alias", "WantedBy", "RequiredBy", "Also", "DefaultInstance"},
|
||||
"Service": {
|
||||
"Type",
|
||||
"ExecStart",
|
||||
"ExecStartPre",
|
||||
"ExecStartPost",
|
||||
"ExecReload",
|
||||
"ExecStop",
|
||||
"ExecStopPost",
|
||||
"RestartSec",
|
||||
"TimeoutStartSec",
|
||||
"TimeoutStopSec",
|
||||
"TimeoutSec",
|
||||
"RuntimeMaxSec",
|
||||
"WatchdogSec",
|
||||
"Restart",
|
||||
"SuccessExitStatus",
|
||||
"RestartPreventExitStatus",
|
||||
"RestartForceExitStatus",
|
||||
"PermissionsStartOnly",
|
||||
"RootDirectoryStartOnly",
|
||||
"RemainAfterExit",
|
||||
"GuessMainPID",
|
||||
"PIDFile",
|
||||
"BusName",
|
||||
"BusPolicy",
|
||||
"User",
|
||||
"Group",
|
||||
"SupplementaryGroups",
|
||||
"WorkingDirectory",
|
||||
"RootDirectory",
|
||||
"Nice",
|
||||
"OOMScoreAdjust",
|
||||
"IOSchedulingClass",
|
||||
"IOSchedulingPriority",
|
||||
"CPUSchedulingPolicy",
|
||||
"CPUSchedulingPriority",
|
||||
"CPUSchedulingResetOnFork",
|
||||
"CPUAffinity",
|
||||
"UMask",
|
||||
"Environment",
|
||||
"EnvironmentFile",
|
||||
"PassEnvironment",
|
||||
"UnsetEnvironment",
|
||||
"StandardInput",
|
||||
"StandardOutput",
|
||||
"StandardError",
|
||||
"TTYPath",
|
||||
"TTYReset",
|
||||
"TTYVHangup",
|
||||
"TTYVTDisallocate",
|
||||
"SyslogIdentifier",
|
||||
"SyslogFacility",
|
||||
"SyslogLevel",
|
||||
"SyslogLevelPrefix",
|
||||
"TimerSlackNSec",
|
||||
"LimitCPU",
|
||||
"LimitFSIZE",
|
||||
"LimitDATA",
|
||||
"LimitSTACK",
|
||||
"LimitCORE",
|
||||
"LimitRSS",
|
||||
"LimitNOFILE",
|
||||
"LimitAS",
|
||||
"LimitNPROC",
|
||||
"LimitMEMLOCK",
|
||||
"LimitLOCKS",
|
||||
"LimitSIGPENDING",
|
||||
"LimitMSGQUEUE",
|
||||
"LimitNICE",
|
||||
"LimitRTPRIO",
|
||||
"LimitRTTIME",
|
||||
"PAMName",
|
||||
"CapabilityBoundingSet",
|
||||
"AmbientCapabilities",
|
||||
"SecureBits",
|
||||
"ReadWritePaths",
|
||||
"ReadOnlyPaths",
|
||||
"InaccessiblePaths",
|
||||
"PrivateTmp",
|
||||
"PrivateDevices",
|
||||
"PrivateNetwork",
|
||||
"ProtectSystem",
|
||||
"ProtectHome",
|
||||
"MountFlags",
|
||||
"UtmpIdentifier",
|
||||
"UtmpMode",
|
||||
"SELinuxContext",
|
||||
"AppArmorProfile",
|
||||
"SmackProcessLabel",
|
||||
"IgnoreSIGPIPE",
|
||||
"NoNewPrivileges",
|
||||
"SystemCallFilter",
|
||||
"SystemCallErrorNumber",
|
||||
"SystemCallArchitectures",
|
||||
"RestrictAddressFamilies",
|
||||
"Personality",
|
||||
"RuntimeDirectory",
|
||||
"RuntimeDirectoryMode",
|
||||
},
|
||||
"Timer": {
|
||||
"OnActiveSec",
|
||||
"OnBootSec",
|
||||
"OnStartupSec",
|
||||
"OnUnitActiveSec",
|
||||
"OnUnitInactiveSec",
|
||||
"OnCalendar",
|
||||
"AccuracySec",
|
||||
"RandomizedDelaySec",
|
||||
"Unit",
|
||||
"Persistent",
|
||||
"WakeSystem",
|
||||
"RemainAfterElapse",
|
||||
},
|
||||
"Socket": {
|
||||
"ListenStream",
|
||||
"ListenDatagram",
|
||||
"ListenSequentialPacket",
|
||||
"ListenFIFO",
|
||||
"ListenSpecial",
|
||||
"ListenNetlink",
|
||||
"ListenMessageQueue",
|
||||
"ListenUSBFunction",
|
||||
"SocketProtocol",
|
||||
"BindIPv6Only",
|
||||
"Backlog",
|
||||
"BindToDevice",
|
||||
"SocketUser",
|
||||
"SocketGroup",
|
||||
"SocketMode",
|
||||
"DirectoryMode",
|
||||
"Accept",
|
||||
"Writable",
|
||||
"MaxConnections",
|
||||
"MaxConnectionsPerSource",
|
||||
"KeepAlive",
|
||||
"KeepAliveTimeSec",
|
||||
"KeepAliveIntervalSec",
|
||||
"KeepAliveProbes",
|
||||
"NoDelay",
|
||||
"Priority",
|
||||
"DeferAcceptSec",
|
||||
"ReceiveBuffer",
|
||||
"SendBuffer",
|
||||
"IPTOS",
|
||||
"IPTTL",
|
||||
"Mark",
|
||||
"ReusePort",
|
||||
"SmackLabel",
|
||||
"SmackLabelIPIn",
|
||||
"SmackLabelIPOut",
|
||||
"SELinuxContextFromNet",
|
||||
"PipeSize",
|
||||
"MessageQueueMaxMessages",
|
||||
"MessageQueueMessageSize",
|
||||
"FreeBind",
|
||||
"Transparent",
|
||||
"Broadcast",
|
||||
"PassCredentials",
|
||||
"PassSecurity",
|
||||
"TCPCongestion",
|
||||
"ExecStartPre",
|
||||
"ExecStartPost",
|
||||
"ExecStopPre",
|
||||
"ExecStopPost",
|
||||
"TimeoutSec",
|
||||
"Service",
|
||||
"RemoveOnStop",
|
||||
"Symlinks",
|
||||
"FileDescriptorName",
|
||||
"TriggerLimitIntervalSec",
|
||||
"TriggerLimitBurst",
|
||||
},
|
||||
"Mount": {
|
||||
"What",
|
||||
"Where",
|
||||
"Type",
|
||||
"Options",
|
||||
"SloppyOptions",
|
||||
"LazyUnmount",
|
||||
"ForceUnmount",
|
||||
"DirectoryMode",
|
||||
"TimeoutSec",
|
||||
},
|
||||
"Target": {},
|
||||
"Path": {
|
||||
"PathExists",
|
||||
"PathExistsGlob",
|
||||
"PathChanged",
|
||||
"PathModified",
|
||||
"DirectoryNotEmpty",
|
||||
"Unit",
|
||||
"MakeDirectory",
|
||||
"DirectoryMode",
|
||||
},
|
||||
}
|
||||
|
||||
# Service types and their requirements
|
||||
SERVICE_TYPES = {
|
||||
"simple": {"required": ["ExecStart"], "conflicts": ["BusName", "Type=forking"]},
|
||||
"exec": {"required": ["ExecStart"], "conflicts": ["BusName"]},
|
||||
"forking": {"recommended": ["PIDFile"], "conflicts": ["BusName"]},
|
||||
"oneshot": {"conflicts": ["Restart=always", "Restart=on-success"]},
|
||||
"dbus": {"required": ["BusName"], "conflicts": []},
|
||||
"notify": {"required": ["ExecStart"], "conflicts": ["BusName"]},
|
||||
"idle": {"required": ["ExecStart"], "conflicts": ["BusName"]},
|
||||
}
|
||||
|
||||
def __init__(self, content: Optional[str] = None, file_path: Optional[str] = None):
|
||||
"""Initialize with either content string or file path."""
|
||||
self.content = content or ""
|
||||
self.file_path = file_path
|
||||
self.config = configparser.ConfigParser(
|
||||
interpolation=None, allow_no_value=True, delimiters=("=",)
|
||||
)
|
||||
self.config.optionxform = lambda optionstr: str(optionstr) # Preserve case
|
||||
self._parse_errors = []
|
||||
|
||||
if content:
|
||||
self._parse_content(content)
|
||||
elif file_path:
|
||||
self._parse_file(file_path)
|
||||
|
||||
def _parse_content(self, content: str) -> None:
|
||||
"""Parse unit file content from string."""
|
||||
try:
|
||||
self.config.read_string(content)
|
||||
self.content = content
|
||||
except configparser.Error as e:
|
||||
self._parse_errors.append(
|
||||
ValidationError("", None, f"Parse error: {str(e)}", "error")
|
||||
)
|
||||
|
||||
def _parse_file(self, file_path: str) -> None:
|
||||
"""Parse unit file from file path."""
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
self._parse_content(content)
|
||||
self.file_path = file_path
|
||||
except IOError as e:
|
||||
self._parse_errors.append(
|
||||
ValidationError("", None, f"File read error: {str(e)}", "error")
|
||||
)
|
||||
|
||||
def get_unit_type(self) -> Optional[UnitType]:
|
||||
"""Determine the unit type based on sections present."""
|
||||
if self.config.has_section("Service"):
|
||||
return UnitType.SERVICE
|
||||
elif self.config.has_section("Timer"):
|
||||
return UnitType.TIMER
|
||||
elif self.config.has_section("Socket"):
|
||||
return UnitType.SOCKET
|
||||
elif self.config.has_section("Mount"):
|
||||
return UnitType.MOUNT
|
||||
elif self.config.has_section("Target"):
|
||||
return UnitType.TARGET
|
||||
elif self.config.has_section("Path"):
|
||||
return UnitType.PATH
|
||||
return None
|
||||
|
||||
def get_info(self) -> UnitFileInfo:
|
||||
"""Extract basic information about the unit file."""
|
||||
unit_type = self.get_unit_type()
|
||||
name = ""
|
||||
if self.file_path:
|
||||
name = self.file_path.split("/")[-1]
|
||||
|
||||
description = None
|
||||
if self.config.has_section("Unit") and self.config.has_option(
|
||||
"Unit", "Description"
|
||||
):
|
||||
description = self.config.get("Unit", "Description")
|
||||
|
||||
documentation = []
|
||||
if self.config.has_section("Unit") and self.config.has_option(
|
||||
"Unit", "Documentation"
|
||||
):
|
||||
doc_value = self.config.get("Unit", "Documentation")
|
||||
documentation = [doc.strip() for doc in doc_value.split()]
|
||||
|
||||
# Parse dependencies
|
||||
requires = self._get_dependency_list("Requires")
|
||||
wants = self._get_dependency_list("Wants")
|
||||
conflicts = self._get_dependency_list("Conflicts")
|
||||
|
||||
if unit_type is None:
|
||||
raise ValueError("Could not determine unit type")
|
||||
|
||||
return UnitFileInfo(
|
||||
name=name,
|
||||
unit_type=unit_type,
|
||||
description=description,
|
||||
documentation=documentation,
|
||||
requires=requires,
|
||||
wants=wants,
|
||||
conflicts=conflicts,
|
||||
)
|
||||
|
||||
def _get_dependency_list(self, key: str) -> List[str]:
|
||||
"""Extract a space-separated dependency list."""
|
||||
if not self.config.has_section("Unit") or not self.config.has_option(
|
||||
"Unit", key
|
||||
):
|
||||
return []
|
||||
|
||||
value = self.config.get("Unit", key)
|
||||
return [dep.strip() for dep in value.split() if dep.strip()]
|
||||
|
||||
def validate(self) -> List[ValidationError]:
|
||||
"""Validate the unit file and return list of errors/warnings."""
|
||||
errors = self._parse_errors.copy()
|
||||
|
||||
# Check for basic structure
|
||||
if not self.config.sections():
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"", None, "Unit file is empty or has no sections", "error"
|
||||
)
|
||||
)
|
||||
return errors
|
||||
|
||||
# Validate sections
|
||||
for section in self.config.sections():
|
||||
errors.extend(self._validate_section(section))
|
||||
|
||||
# Type-specific validation
|
||||
unit_type = self.get_unit_type()
|
||||
if unit_type:
|
||||
errors.extend(self._validate_unit_type(unit_type))
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_section(self, section: str) -> List[ValidationError]:
|
||||
"""Validate a specific section."""
|
||||
errors = []
|
||||
|
||||
# Check if section is known
|
||||
if section not in self.COMMON_SECTIONS:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, None, f"Unknown section '{section}'", "warning"
|
||||
)
|
||||
)
|
||||
return errors
|
||||
|
||||
# Validate keys in section
|
||||
valid_keys = self.SECTION_KEYS.get(section, set())
|
||||
for key in self.config.options(section):
|
||||
if valid_keys and key not in valid_keys:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section,
|
||||
key,
|
||||
f"Unknown key '{key}' in section '{section}'",
|
||||
"warning",
|
||||
)
|
||||
)
|
||||
|
||||
# Validate key values
|
||||
value = self.config.get(section, key)
|
||||
errors.extend(self._validate_key_value(section, key, value))
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_key_value(
|
||||
self, section: str, key: str, value: str
|
||||
) -> List[ValidationError]:
|
||||
"""Validate specific key-value pairs."""
|
||||
errors = []
|
||||
|
||||
# Service-specific validations
|
||||
if section == "Service":
|
||||
if key == "Type" and value not in self.SERVICE_TYPES:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, f"Invalid service type '{value}'", "error"
|
||||
)
|
||||
)
|
||||
|
||||
elif key == "Restart" and value not in [
|
||||
"no",
|
||||
"always",
|
||||
"on-success",
|
||||
"on-failure",
|
||||
"on-abnormal",
|
||||
"on-abort",
|
||||
"on-watchdog",
|
||||
]:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, f"Invalid restart value '{value}'", "error"
|
||||
)
|
||||
)
|
||||
|
||||
elif key.startswith("Exec") and not value:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, f"Empty execution command for '{key}'", "error"
|
||||
)
|
||||
)
|
||||
|
||||
elif key in ["TimeoutStartSec", "TimeoutStopSec", "RestartSec"] and value:
|
||||
if not self._is_valid_time_span(value):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, f"Invalid time span '{value}'", "error"
|
||||
)
|
||||
)
|
||||
|
||||
# Timer-specific validations
|
||||
elif section == "Timer":
|
||||
if key.startswith("On") and key.endswith("Sec") and value:
|
||||
if not self._is_valid_time_span(value):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, f"Invalid time span '{value}'", "error"
|
||||
)
|
||||
)
|
||||
|
||||
elif key == "OnCalendar" and value:
|
||||
# Basic calendar validation - could be more comprehensive
|
||||
if not re.match(r"^[\w\s:*/-]+$", value):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section,
|
||||
key,
|
||||
f"Invalid calendar specification '{value}'",
|
||||
"warning",
|
||||
)
|
||||
)
|
||||
|
||||
# Socket-specific validations
|
||||
elif section == "Socket":
|
||||
if key.startswith("Listen") and not value:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, f"Empty listen specification for '{key}'", "error"
|
||||
)
|
||||
)
|
||||
|
||||
# Mount-specific validations
|
||||
elif section == "Mount":
|
||||
if key == "What" and not value:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, "Mount source (What) cannot be empty", "error"
|
||||
)
|
||||
)
|
||||
elif key == "Where" and not value:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, "Mount point (Where) cannot be empty", "error"
|
||||
)
|
||||
)
|
||||
elif key == "Where" and not value.startswith("/"):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
section, key, "Mount point must be an absolute path", "error"
|
||||
)
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_unit_type(self, unit_type: UnitType) -> List[ValidationError]:
|
||||
"""Perform type-specific validation."""
|
||||
errors = []
|
||||
|
||||
if unit_type == UnitType.SERVICE:
|
||||
errors.extend(self._validate_service())
|
||||
elif unit_type == UnitType.TIMER:
|
||||
errors.extend(self._validate_timer())
|
||||
elif unit_type == UnitType.SOCKET:
|
||||
errors.extend(self._validate_socket())
|
||||
elif unit_type == UnitType.MOUNT:
|
||||
errors.extend(self._validate_mount())
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_service(self) -> List[ValidationError]:
|
||||
"""Validate service-specific requirements."""
|
||||
errors = []
|
||||
|
||||
if not self.config.has_section("Service"):
|
||||
return errors
|
||||
|
||||
service_type = self.config.get("Service", "Type", fallback="simple")
|
||||
type_config = self.SERVICE_TYPES.get(service_type, {})
|
||||
|
||||
# Check required keys
|
||||
for required_key in type_config.get("required", []):
|
||||
if not self.config.has_option("Service", required_key):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Service",
|
||||
required_key,
|
||||
(
|
||||
f"Required key '{required_key}' missing for "
|
||||
f"service type '{service_type}'"
|
||||
),
|
||||
"error",
|
||||
)
|
||||
)
|
||||
|
||||
# Check for conflicting configurations
|
||||
for conflict in type_config.get("conflicts", []):
|
||||
if "=" in conflict:
|
||||
key, value = conflict.split("=", 1)
|
||||
if self.config.has_option("Service", key):
|
||||
actual_value = self.config.get("Service", key)
|
||||
if actual_value == value:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Service",
|
||||
key,
|
||||
(
|
||||
f"'{key}={value}' conflicts with "
|
||||
f"service type '{service_type}'"
|
||||
),
|
||||
"error",
|
||||
)
|
||||
)
|
||||
else:
|
||||
if self.config.has_option("Service", conflict):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Service",
|
||||
conflict,
|
||||
(
|
||||
f"'{conflict}' conflicts with "
|
||||
f"service type '{service_type}'"
|
||||
),
|
||||
"error",
|
||||
)
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_timer(self) -> List[ValidationError]:
|
||||
"""Validate timer-specific requirements."""
|
||||
errors = []
|
||||
|
||||
if not self.config.has_section("Timer"):
|
||||
return errors
|
||||
|
||||
# At least one timer specification is required
|
||||
timer_keys = [
|
||||
"OnActiveSec",
|
||||
"OnBootSec",
|
||||
"OnStartupSec",
|
||||
"OnUnitActiveSec",
|
||||
"OnUnitInactiveSec",
|
||||
"OnCalendar",
|
||||
]
|
||||
has_timer = any(self.config.has_option("Timer", key) for key in timer_keys)
|
||||
|
||||
if not has_timer:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Timer",
|
||||
None,
|
||||
"Timer must have at least one timing specification",
|
||||
"error",
|
||||
)
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_socket(self) -> List[ValidationError]:
|
||||
"""Validate socket-specific requirements."""
|
||||
errors = []
|
||||
|
||||
if not self.config.has_section("Socket"):
|
||||
return errors
|
||||
|
||||
# At least one listen specification is required
|
||||
listen_keys = [
|
||||
"ListenStream",
|
||||
"ListenDatagram",
|
||||
"ListenSequentialPacket",
|
||||
"ListenFIFO",
|
||||
"ListenSpecial",
|
||||
"ListenNetlink",
|
||||
"ListenMessageQueue",
|
||||
]
|
||||
has_listen = any(self.config.has_option("Socket", key) for key in listen_keys)
|
||||
|
||||
if not has_listen:
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Socket",
|
||||
None,
|
||||
"Socket must have at least one Listen specification",
|
||||
"error",
|
||||
)
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
def _validate_mount(self) -> List[ValidationError]:
|
||||
"""Validate mount-specific requirements."""
|
||||
errors = []
|
||||
|
||||
if not self.config.has_section("Mount"):
|
||||
return errors
|
||||
|
||||
# What and Where are required
|
||||
if not self.config.has_option("Mount", "What"):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Mount",
|
||||
"What",
|
||||
"Mount units require 'What' (source) specification",
|
||||
"error",
|
||||
)
|
||||
)
|
||||
|
||||
if not self.config.has_option("Mount", "Where"):
|
||||
errors.append(
|
||||
ValidationError(
|
||||
"Mount",
|
||||
"Where",
|
||||
"Mount units require 'Where' (mount point) specification",
|
||||
"error",
|
||||
)
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
def _is_valid_time_span(self, value: str) -> bool:
|
||||
"""Validate systemd time span format."""
|
||||
# Basic validation for time spans like "30s", "5min", "1h", "infinity"
|
||||
if value.lower() in ["infinity", "0"]:
|
||||
return True
|
||||
|
||||
pattern = r"^\d+(\.\d+)?(us|ms|s|min|h|d|w|month|y)?$"
|
||||
return bool(re.match(pattern, value.lower()))
|
||||
|
||||
def to_string(self) -> str:
|
||||
"""Convert the unit file back to string format."""
|
||||
if not self.config.sections():
|
||||
return ""
|
||||
|
||||
lines = []
|
||||
for section in self.config.sections():
|
||||
lines.append(f"[{section}]")
|
||||
for key in self.config.options(section):
|
||||
value = self.config.get(section, key)
|
||||
if value is None:
|
||||
lines.append(key)
|
||||
else:
|
||||
lines.append(f"{key}={value}")
|
||||
lines.append("") # Empty line between sections
|
||||
|
||||
return "\n".join(lines).rstrip()
|
||||
|
||||
def set_value(self, section: str, key: str, value: str) -> None:
|
||||
"""Set a value in the unit file."""
|
||||
if not self.config.has_section(section):
|
||||
self.config.add_section(section)
|
||||
self.config.set(section, key, value)
|
||||
|
||||
def get_value(
|
||||
self, section: str, key: str, fallback: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get a value from the unit file."""
|
||||
if not self.config.has_section(section):
|
||||
return fallback
|
||||
return self.config.get(section, key, fallback=fallback)
|
||||
|
||||
def remove_key(self, section: str, key: str) -> bool:
|
||||
"""Remove a key from the unit file."""
|
||||
if self.config.has_section(section) and self.config.has_option(section, key):
|
||||
self.config.remove_option(section, key)
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_sections(self) -> List[str]:
|
||||
"""Get all sections in the unit file."""
|
||||
return self.config.sections()
|
||||
|
||||
def get_keys(self, section: str) -> List[str]:
|
||||
"""Get all keys in a section."""
|
||||
if not self.config.has_section(section):
|
||||
return []
|
||||
return self.config.options(section)
|
||||
|
||||
|
||||
def create_unit_file(unit_type: UnitType, **kwargs) -> SystemdUnitFile:
|
||||
"""
|
||||
Create a new unit file of the specified type with basic structure.
|
||||
|
||||
Args:
|
||||
unit_type: The type of unit file to create
|
||||
**kwargs: Additional configuration parameters
|
||||
|
||||
Returns:
|
||||
SystemdUnitFile instance with basic structure
|
||||
"""
|
||||
unit = SystemdUnitFile()
|
||||
|
||||
# Add Unit section
|
||||
unit.set_value(
|
||||
"Unit",
|
||||
"Description",
|
||||
kwargs.get("description", f"Generated {unit_type.value} unit"),
|
||||
)
|
||||
|
||||
if kwargs.get("documentation"):
|
||||
unit.set_value("Unit", "Documentation", kwargs["documentation"])
|
||||
|
||||
if kwargs.get("requires"):
|
||||
unit.set_value("Unit", "Requires", " ".join(kwargs["requires"]))
|
||||
|
||||
if kwargs.get("wants"):
|
||||
unit.set_value("Unit", "Wants", " ".join(kwargs["wants"]))
|
||||
|
||||
if kwargs.get("after"):
|
||||
unit.set_value("Unit", "After", " ".join(kwargs["after"]))
|
||||
|
||||
# Add type-specific sections
|
||||
if unit_type == UnitType.SERVICE:
|
||||
unit.set_value("Service", "Type", kwargs.get("service_type", "simple"))
|
||||
if kwargs.get("exec_start"):
|
||||
unit.set_value("Service", "ExecStart", kwargs["exec_start"])
|
||||
if kwargs.get("user"):
|
||||
unit.set_value("Service", "User", kwargs["user"])
|
||||
if kwargs.get("group"):
|
||||
unit.set_value("Service", "Group", kwargs["group"])
|
||||
if kwargs.get("working_directory"):
|
||||
unit.set_value("Service", "WorkingDirectory", kwargs["working_directory"])
|
||||
unit.set_value("Service", "Restart", kwargs.get("restart", "on-failure"))
|
||||
|
||||
elif unit_type == UnitType.TIMER:
|
||||
if kwargs.get("on_calendar"):
|
||||
unit.set_value("Timer", "OnCalendar", kwargs["on_calendar"])
|
||||
if kwargs.get("on_boot_sec"):
|
||||
unit.set_value("Timer", "OnBootSec", kwargs["on_boot_sec"])
|
||||
unit.set_value(
|
||||
"Timer", "Persistent", str(kwargs.get("persistent", "true")).lower()
|
||||
)
|
||||
|
||||
elif unit_type == UnitType.SOCKET:
|
||||
if kwargs.get("listen_stream"):
|
||||
unit.set_value("Socket", "ListenStream", kwargs["listen_stream"])
|
||||
if kwargs.get("listen_datagram"):
|
||||
unit.set_value("Socket", "ListenDatagram", kwargs["listen_datagram"])
|
||||
|
||||
elif unit_type == UnitType.MOUNT:
|
||||
if kwargs.get("what"):
|
||||
unit.set_value("Mount", "What", kwargs["what"])
|
||||
if kwargs.get("where"):
|
||||
unit.set_value("Mount", "Where", kwargs["where"])
|
||||
if kwargs.get("type"):
|
||||
unit.set_value("Mount", "Type", kwargs["type"])
|
||||
if kwargs.get("options"):
|
||||
unit.set_value("Mount", "Options", kwargs["options"])
|
||||
|
||||
# Add Install section if specified
|
||||
if kwargs.get("wanted_by"):
|
||||
wanted_by_value = kwargs["wanted_by"]
|
||||
if isinstance(wanted_by_value, (list, tuple)):
|
||||
unit.set_value("Install", "WantedBy", " ".join(wanted_by_value))
|
||||
else:
|
||||
unit.set_value("Install", "WantedBy", str(wanted_by_value))
|
||||
elif unit_type in [UnitType.SERVICE, UnitType.TIMER]:
|
||||
unit.set_value("Install", "WantedBy", "multi-user.target")
|
||||
|
||||
return unit
|
||||
Reference in New Issue
Block a user