Files
swarm-zap/skills/n8n-webhook/scripts/resolve-approval-with-gog.py

242 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
DEFAULT_BASE_URL = os.environ.get('N8N_BASE_URL', 'http://192.168.153.113:18808').rstrip('/')
DEFAULT_ACTION_PATH = os.environ.get('N8N_ACTION_PATH', 'openclaw-action').strip('/')
DEFAULT_SECRET_HEADER = os.environ.get('N8N_SECRET_HEADER', 'x-openclaw-secret')
def fail(msg: str, code: int = 1):
print(msg, file=sys.stderr)
raise SystemExit(code)
def run(cmd, *, env=None):
proc = subprocess.run(cmd, capture_output=True, text=True, env=env)
return proc.returncode, proc.stdout, proc.stderr
def gog_account(args_account: str | None) -> str:
account = args_account or os.environ.get('GOG_ACCOUNT', '').strip()
if not account:
fail('missing gog account: pass --account or set GOG_ACCOUNT')
return account
def webhook_secret() -> str:
secret = os.environ.get('N8N_WEBHOOK_SECRET', '').strip()
if not secret:
fail('missing N8N_WEBHOOK_SECRET in environment')
return secret
def call_action(payload: dict, *, base_url: str, path: str, secret_header: str, secret: str) -> dict:
url = f'{base_url}/webhook/{path}'
req = urllib.request.Request(
url,
data=json.dumps(payload).encode(),
method='POST',
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
secret_header: secret,
},
)
try:
with urllib.request.urlopen(req, timeout=60) as r:
body = r.read().decode('utf-8', 'replace')
return json.loads(body) if body else {}
except urllib.error.HTTPError as e:
body = e.read().decode('utf-8', 'replace')
try:
parsed = json.loads(body) if body else {}
except Exception:
parsed = {'ok': False, 'error': {'code': 'http_error', 'message': body or str(e)}}
parsed.setdefault('http_status', e.code)
return parsed
def attach_execution(item_id: str, execution: dict, *, base_url: str, path: str, secret_header: str, secret: str) -> dict:
return call_action(
{
'action': 'approval_history_attach_execution',
'args': {'id': item_id, 'execution': execution},
'request_id': f'attach-{item_id}',
},
base_url=base_url,
path=path,
secret_header=secret_header,
secret=secret,
)
def build_email_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
body_text = payload.get('body_text') or ''
body_html = payload.get('body_html') or ''
cmd = [
'gog', 'gmail', 'drafts', 'create',
'--account', account,
'--json',
'--no-input',
'--to', ','.join(payload.get('to') or []),
'--subject', payload.get('subject') or '',
]
for key in ('cc', 'bcc'):
vals = payload.get(key) or []
if vals:
cmd.extend([f'--{key}', ','.join(vals)])
tmp = None
if body_text:
tmp = tempfile.NamedTemporaryFile('w', delete=False, encoding='utf-8', suffix='.txt')
tmp.write(body_text)
tmp.close()
cmd.extend(['--body-file', tmp.name])
elif body_html:
# gog requires body or body_html; for HTML-only drafts we can use body_html.
pass
else:
fail('email_draft payload missing body_text/body_html')
if body_html:
cmd.extend(['--body-html', body_html])
if dry_run:
cmd.append('--dry-run')
return cmd, tmp.name if tmp else None
def build_calendar_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
calendar = payload.get('calendar') or 'primary'
cmd = [
'gog', 'calendar', 'create', calendar,
'--account', account,
'--json',
'--no-input',
'--summary', payload.get('title') or '',
'--from', payload.get('start') or '',
'--to', payload.get('end') or '',
'--send-updates', 'none',
]
if payload.get('description'):
cmd.extend(['--description', payload['description']])
if payload.get('location'):
cmd.extend(['--location', payload['location']])
attendees = payload.get('attendees') or []
if attendees:
cmd.extend(['--attendees', ','.join(attendees)])
if dry_run:
cmd.append('--dry-run')
return cmd
def parse_json(output: str):
text = output.strip()
if not text:
return None
return json.loads(text)
def main():
ap = argparse.ArgumentParser(description='Resolve an n8n approval item and execute email/calendar actions via gog.')
ap.add_argument('--id', required=True, help='Approval queue item id')
ap.add_argument('--decision', choices=['approve', 'reject'], default='approve')
ap.add_argument('--account', help='Google account email; otherwise uses GOG_ACCOUNT')
ap.add_argument('--dry-run', action='store_true', help='Use gog --dry-run for host execution')
ap.add_argument('--base-url', default=DEFAULT_BASE_URL)
ap.add_argument('--path', default=DEFAULT_ACTION_PATH)
ap.add_argument('--secret-header', default=DEFAULT_SECRET_HEADER)
args = ap.parse_args()
secret = webhook_secret()
resolved = call_action(
{
'action': 'approval_queue_resolve',
'args': {'id': args.id, 'decision': args.decision, 'note': 'resolved by host gog executor', 'notify_on_resolve': False},
'request_id': f'resolve-{args.id}',
},
base_url=args.base_url,
path=args.path,
secret_header=args.secret_header,
secret=secret,
)
if not resolved.get('ok'):
print(json.dumps(resolved, indent=2))
raise SystemExit(1)
result = (resolved.get('result') or {})
item = result.get('item') or {}
kind = item.get('kind') or ''
if args.decision == 'reject':
print(json.dumps({'resolved': resolved, 'executed': False, 'reason': 'rejected'}, indent=2))
return
if result.get('executed') is True:
print(json.dumps({'resolved': resolved, 'executed': True, 'driver': 'n8n'}, indent=2))
return
if kind not in {'email_draft', 'calendar_event'}:
print(json.dumps({'resolved': resolved, 'executed': False, 'reason': f'no host executor for kind {kind}'}, indent=2))
return
account = gog_account(args.account)
env = os.environ.copy()
env['GOG_ACCOUNT'] = account
if kind == 'email_draft':
cmd, tmpfile = build_email_command(item, account, args.dry_run)
op = 'gmail.drafts.create'
success_status = 'draft_created' if not args.dry_run else 'dry_run'
else:
cmd = build_calendar_command(item, account, args.dry_run)
tmpfile = None
op = 'calendar.create'
success_status = 'event_created' if not args.dry_run else 'dry_run'
try:
code, stdout, stderr = run(cmd, env=env)
finally:
if tmpfile:
try:
Path(tmpfile).unlink(missing_ok=True)
except Exception:
pass
if code != 0:
execution = {
'driver': 'gog',
'op': op,
'status': 'failed',
'account': account,
'dry_run': args.dry_run,
'stderr': stderr.strip(),
'stdout': stdout.strip(),
}
attach = attach_execution(item['id'], execution, base_url=args.base_url, path=args.path, secret_header=args.secret_header, secret=secret)
print(json.dumps({'resolved': resolved, 'execution': execution, 'attach': attach}, indent=2))
raise SystemExit(code)
parsed = parse_json(stdout) if stdout.strip() else None
execution = {
'driver': 'gog',
'op': op,
'status': success_status,
'account': account,
'dry_run': args.dry_run,
'result': parsed,
}
attach = attach_execution(item['id'], execution, base_url=args.base_url, path=args.path, secret_header=args.secret_header, secret=secret)
print(json.dumps({'resolved': resolved, 'execution': execution, 'attach': attach}, indent=2))
if __name__ == '__main__':
main()