Implement component registry for PA session awareness
Components: - state/component-registry.json: Registry with all skills, commands, agents, workflows - automation/generate-registry.py: Auto-generate from directory scan - automation/validate-registry.py: Check for drift and TODO placeholders - system-instructions.json: Added component-lifecycle process Registry includes: - 6 skills with routing triggers - 10 commands with aliases - 12 agents with model info - 10 workflows with triggers - 2 delegation helpers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
260
automation/generate-registry.py
Executable file
260
automation/generate-registry.py
Executable file
@@ -0,0 +1,260 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generate Component Registry
|
||||
|
||||
Scans component directories and generates/updates the registry.
|
||||
Preserves existing manual hints (triggers, descriptions) on regeneration.
|
||||
|
||||
Usage:
|
||||
python3 generate-registry.py [--dry-run]
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
LOCAL_TZ = ZoneInfo('America/Los_Angeles')
|
||||
CLAUDE_DIR = Path.home() / ".claude"
|
||||
REGISTRY_PATH = CLAUDE_DIR / "state" / "component-registry.json"
|
||||
|
||||
# Scan paths
|
||||
SCAN_PATHS = {
|
||||
"skills": CLAUDE_DIR / "skills",
|
||||
"commands": CLAUDE_DIR / "commands",
|
||||
"agents": CLAUDE_DIR / "agents",
|
||||
"workflows": CLAUDE_DIR / "workflows",
|
||||
}
|
||||
|
||||
|
||||
def parse_frontmatter(file_path: Path) -> dict:
|
||||
"""Extract YAML frontmatter from markdown file."""
|
||||
try:
|
||||
content = file_path.read_text()
|
||||
if content.startswith('---'):
|
||||
end = content.find('---', 3)
|
||||
if end != -1:
|
||||
frontmatter = content[3:end].strip()
|
||||
result = {}
|
||||
for line in frontmatter.split('\n'):
|
||||
if ':' in line:
|
||||
key, value = line.split(':', 1)
|
||||
result[key.strip()] = value.strip().strip('"\'')
|
||||
return result
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def scan_skills() -> dict:
|
||||
"""Scan skills directory for SKILL.md files."""
|
||||
skills = {}
|
||||
skills_dir = SCAN_PATHS["skills"]
|
||||
if not skills_dir.exists():
|
||||
return skills
|
||||
|
||||
for skill_dir in skills_dir.iterdir():
|
||||
if skill_dir.is_dir():
|
||||
skill_file = skill_dir / "SKILL.md"
|
||||
if skill_file.exists():
|
||||
fm = parse_frontmatter(skill_file)
|
||||
skills[skill_dir.name] = {
|
||||
"description": fm.get("description", "TODO"),
|
||||
"triggers": ["TODO"]
|
||||
}
|
||||
return skills
|
||||
|
||||
|
||||
def scan_commands() -> dict:
|
||||
"""Scan commands directory for .md files."""
|
||||
commands = {}
|
||||
commands_dir = SCAN_PATHS["commands"]
|
||||
if not commands_dir.exists():
|
||||
return commands
|
||||
|
||||
for cmd_file in commands_dir.rglob("*.md"):
|
||||
rel_path = cmd_file.relative_to(commands_dir)
|
||||
# Convert path to command name
|
||||
cmd_name = "/" + str(rel_path).replace(".md", "").replace("/", ":")
|
||||
|
||||
fm = parse_frontmatter(cmd_file)
|
||||
aliases = fm.get("aliases", "")
|
||||
if aliases.startswith("["):
|
||||
# Parse array format
|
||||
aliases = [a.strip().strip('"\'') for a in aliases[1:-1].split(",") if a.strip()]
|
||||
aliases = ["/" + a if not a.startswith("/") else a for a in aliases]
|
||||
else:
|
||||
aliases = []
|
||||
|
||||
commands[cmd_name] = {
|
||||
"description": fm.get("description", "TODO"),
|
||||
"aliases": aliases,
|
||||
"invokes": fm.get("invokes", "")
|
||||
}
|
||||
return commands
|
||||
|
||||
|
||||
def scan_agents() -> dict:
|
||||
"""Scan agents directory for .md files."""
|
||||
agents = {}
|
||||
agents_dir = SCAN_PATHS["agents"]
|
||||
if not agents_dir.exists():
|
||||
return agents
|
||||
|
||||
for agent_file in agents_dir.glob("*.md"):
|
||||
agent_name = agent_file.stem
|
||||
fm = parse_frontmatter(agent_file)
|
||||
agents[agent_name] = {
|
||||
"description": fm.get("description", "TODO"),
|
||||
"model": fm.get("model", "sonnet"),
|
||||
"triggers": ["TODO"]
|
||||
}
|
||||
return agents
|
||||
|
||||
|
||||
def scan_workflows() -> dict:
|
||||
"""Scan workflows directory for .yaml/.yml files."""
|
||||
workflows = {}
|
||||
workflows_dir = SCAN_PATHS["workflows"]
|
||||
if not workflows_dir.exists():
|
||||
return workflows
|
||||
|
||||
for wf_file in workflows_dir.rglob("*.yaml"):
|
||||
rel_path = wf_file.relative_to(workflows_dir)
|
||||
wf_name = str(rel_path).replace(".yaml", "")
|
||||
|
||||
# Try to parse name from YAML
|
||||
try:
|
||||
content = wf_file.read_text()
|
||||
for line in content.split('\n'):
|
||||
if line.startswith('name:'):
|
||||
desc = line.split(':', 1)[1].strip().strip('"\'')
|
||||
break
|
||||
if line.startswith('description:'):
|
||||
desc = line.split(':', 1)[1].strip().strip('"\'')
|
||||
break
|
||||
else:
|
||||
desc = "TODO"
|
||||
except Exception:
|
||||
desc = "TODO"
|
||||
|
||||
workflows[wf_name] = {
|
||||
"description": desc,
|
||||
"triggers": ["TODO"]
|
||||
}
|
||||
|
||||
# Also check .yml
|
||||
for wf_file in workflows_dir.rglob("*.yml"):
|
||||
rel_path = wf_file.relative_to(workflows_dir)
|
||||
wf_name = str(rel_path).replace(".yml", "")
|
||||
workflows[wf_name] = {
|
||||
"description": "TODO",
|
||||
"triggers": ["TODO"]
|
||||
}
|
||||
|
||||
return workflows
|
||||
|
||||
|
||||
def merge_with_existing(scanned: dict, existing: dict, component_type: str) -> dict:
|
||||
"""Merge scanned components with existing registry, preserving manual hints."""
|
||||
merged = {}
|
||||
|
||||
# Process scanned components
|
||||
for name, data in scanned.items():
|
||||
if name in existing:
|
||||
# Preserve existing manual hints
|
||||
merged[name] = existing[name].copy()
|
||||
# Update auto-generated fields if they were TODO
|
||||
if merged[name].get("description") == "TODO" and data.get("description") != "TODO":
|
||||
merged[name]["description"] = data["description"]
|
||||
else:
|
||||
# New component
|
||||
merged[name] = data
|
||||
print(f" + NEW: {component_type}/{name}")
|
||||
|
||||
# Check for removed components
|
||||
for name in existing:
|
||||
if name not in scanned:
|
||||
merged[name] = existing[name].copy()
|
||||
merged[name]["status"] = "removed"
|
||||
print(f" - REMOVED: {component_type}/{name}")
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
def generate_registry(dry_run: bool = False) -> dict:
|
||||
"""Generate the component registry."""
|
||||
print("Scanning components...")
|
||||
|
||||
# Load existing registry
|
||||
existing = {"skills": {}, "commands": {}, "agents": {}, "workflows": {}}
|
||||
if REGISTRY_PATH.exists():
|
||||
try:
|
||||
with open(REGISTRY_PATH) as f:
|
||||
existing_data = json.load(f)
|
||||
existing = {
|
||||
"skills": existing_data.get("skills", {}),
|
||||
"commands": existing_data.get("commands", {}),
|
||||
"agents": existing_data.get("agents", {}),
|
||||
"workflows": existing_data.get("workflows", {}),
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load existing registry: {e}")
|
||||
|
||||
# Scan directories
|
||||
scanned_skills = scan_skills()
|
||||
scanned_commands = scan_commands()
|
||||
scanned_agents = scan_agents()
|
||||
scanned_workflows = scan_workflows()
|
||||
|
||||
print(f"\nFound: {len(scanned_skills)} skills, {len(scanned_commands)} commands, "
|
||||
f"{len(scanned_agents)} agents, {len(scanned_workflows)} workflows")
|
||||
|
||||
# Merge with existing
|
||||
print("\nMerging with existing registry...")
|
||||
merged_skills = merge_with_existing(scanned_skills, existing["skills"], "skills")
|
||||
merged_commands = merge_with_existing(scanned_commands, existing["commands"], "commands")
|
||||
merged_agents = merge_with_existing(scanned_agents, existing["agents"], "agents")
|
||||
merged_workflows = merge_with_existing(scanned_workflows, existing["workflows"], "workflows")
|
||||
|
||||
# Build registry
|
||||
registry = {
|
||||
"version": "1.0",
|
||||
"generated": datetime.now(LOCAL_TZ).isoformat(),
|
||||
"description": "Component registry for PA session awareness. Read at session start for routing.",
|
||||
"skills": merged_skills,
|
||||
"commands": merged_commands,
|
||||
"agents": merged_agents,
|
||||
"workflows": merged_workflows,
|
||||
}
|
||||
|
||||
# Preserve delegation_helpers if exists
|
||||
if REGISTRY_PATH.exists():
|
||||
try:
|
||||
with open(REGISTRY_PATH) as f:
|
||||
existing_data = json.load(f)
|
||||
if "delegation_helpers" in existing_data:
|
||||
registry["delegation_helpers"] = existing_data["delegation_helpers"]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if dry_run:
|
||||
print("\n[DRY RUN] Would write:")
|
||||
print(json.dumps(registry, indent=2))
|
||||
else:
|
||||
with open(REGISTRY_PATH, 'w') as f:
|
||||
json.dump(registry, f, indent=2)
|
||||
print(f"\nRegistry written to {REGISTRY_PATH}")
|
||||
|
||||
return registry
|
||||
|
||||
|
||||
def main():
|
||||
dry_run = "--dry-run" in sys.argv
|
||||
generate_registry(dry_run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user