Files
swarm-zap/skills/n8n-webhook/scripts/resolve-approval-with-gog.py

553 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
DEFAULT_BASE_URL = os.environ.get('N8N_BASE_URL', 'http://192.168.153.113:18808').rstrip('/')
DEFAULT_ACTION_PATH = os.environ.get('N8N_ACTION_PATH', 'openclaw-action').strip('/')
DEFAULT_SECRET_HEADER = os.environ.get('N8N_SECRET_HEADER', 'x-openclaw-secret')
DEFAULT_GOG_ENV_FILE = Path(os.environ.get('GOG_ENV_FILE', '/home/openclaw/.openclaw/credentials/gog.env'))
def fail(msg: str, code: int = 1):
print(msg, file=sys.stderr)
raise SystemExit(code)
def run(cmd, *, env=None):
proc = subprocess.run(cmd, capture_output=True, text=True, env=env)
return proc.returncode, proc.stdout, proc.stderr
def load_env_file(path: Path) -> dict:
if not path.exists():
return {}
try:
st = path.stat()
if (st.st_mode & 0o077) != 0:
fail(f'insecure permissions on {path}; expected mode 600-ish')
except FileNotFoundError:
return {}
loaded = {}
for raw_line in path.read_text(encoding='utf-8').splitlines():
line = raw_line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
value = value[1:-1]
loaded[key] = value
return loaded
def gog_account(args_account: str | None) -> str:
account = args_account or os.environ.get('GOG_ACCOUNT', '').strip()
if not account:
fail('missing gog account: pass --account or set GOG_ACCOUNT')
return account
def webhook_secret() -> str:
secret = os.environ.get('N8N_WEBHOOK_SECRET', '').strip()
if not secret:
fail('missing N8N_WEBHOOK_SECRET in environment')
return secret
def call_action(payload: dict, *, base_url: str, path: str, secret_header: str, secret: str) -> dict:
url = f'{base_url}/webhook/{path}'
req = urllib.request.Request(
url,
data=json.dumps(payload).encode(),
method='POST',
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
secret_header: secret,
},
)
try:
with urllib.request.urlopen(req, timeout=60) as r:
body = r.read().decode('utf-8', 'replace')
return json.loads(body) if body else {}
except urllib.error.HTTPError as e:
body = e.read().decode('utf-8', 'replace')
try:
parsed = json.loads(body) if body else {}
except Exception:
parsed = {'ok': False, 'error': {'code': 'http_error', 'message': body or str(e)}}
parsed.setdefault('http_status', e.code)
return parsed
def attach_execution(item_id: str, execution: dict, *, base_url: str, path: str, secret_header: str, secret: str) -> dict:
return call_action(
{
'action': 'approval_history_attach_execution',
'args': {'id': item_id, 'execution': execution},
'request_id': f'attach-{item_id}',
},
base_url=base_url,
path=path,
secret_header=secret_header,
secret=secret,
)
def build_email_draft_create_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
body_text = payload.get('body_text') or ''
body_html = payload.get('body_html') or ''
cmd = [
'gog', 'gmail', 'drafts', 'create',
'--account', account,
'--json',
'--no-input',
'--to', ','.join(payload.get('to') or []),
'--subject', payload.get('subject') or '',
]
for key in ('cc', 'bcc'):
vals = payload.get(key) or []
if vals:
cmd.extend([f'--{key}', ','.join(vals)])
tmp = None
if body_text:
tmp = tempfile.NamedTemporaryFile('w', delete=False, encoding='utf-8', suffix='.txt')
tmp.write(body_text)
tmp.close()
cmd.extend(['--body-file', tmp.name])
elif body_html:
# gog requires body or body_html; for HTML-only drafts we can use body_html.
pass
else:
fail('email_draft payload missing body_text/body_html')
if body_html:
cmd.extend(['--body-html', body_html])
if dry_run:
cmd.append('--dry-run')
return cmd, tmp.name if tmp else None
def build_email_draft_delete_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
draft_id = (payload.get('draft_id') or payload.get('id') or '').strip()
if not draft_id:
fail('email_draft_delete payload missing draft_id')
cmd = [
'gog', 'gmail', 'drafts', 'delete', draft_id,
'--account', account,
'--json',
'--no-input',
'--force',
]
if dry_run:
cmd.append('--dry-run')
return cmd
def build_email_draft_send_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
draft_id = (payload.get('draft_id') or payload.get('id') or '').strip()
if not draft_id:
fail('email_draft_send payload missing draft_id')
cmd = [
'gog', 'gmail', 'drafts', 'send', draft_id,
'--account', account,
'--json',
'--no-input',
]
if dry_run:
cmd.append('--dry-run')
return cmd
def build_email_drafts_list_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
max_results = payload.get('max')
if max_results is None:
max_results = 20
try:
max_results = max(1, min(100, int(max_results)))
except Exception:
max_results = 20
cmd = [
'gog', 'gmail', 'drafts', 'list',
'--account', account,
'--json',
'--no-input',
'--max', str(max_results),
]
page = (payload.get('page') or '').strip()
if page:
cmd.extend(['--page', page])
if payload.get('all') is True:
cmd.append('--all')
if payload.get('fail_empty') is True:
cmd.append('--fail-empty')
if dry_run:
cmd.append('--dry-run')
return cmd
def normalize_send_updates(value: str) -> str:
raw = (value or '').strip()
if raw == 'all':
return 'all'
if raw.lower() == 'externalonly':
return 'externalOnly'
return 'none'
def build_calendar_create_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
calendar = payload.get('calendar') or 'primary'
cmd = [
'gog', 'calendar', 'create', calendar,
'--account', account,
'--json',
'--no-input',
'--summary', payload.get('title') or '',
'--from', payload.get('start') or '',
'--to', payload.get('end') or '',
'--send-updates', normalize_send_updates(payload.get('send_updates') or 'none'),
]
if payload.get('description'):
cmd.extend(['--description', payload['description']])
if payload.get('location'):
cmd.extend(['--location', payload['location']])
attendees = payload.get('attendees') or []
if attendees:
cmd.extend(['--attendees', ','.join(attendees)])
if dry_run:
cmd.append('--dry-run')
return cmd
def build_calendar_list_events_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
calendar = (payload.get('calendar') or 'primary').strip() or 'primary'
max_results = payload.get('max')
if max_results is None:
max_results = 20
try:
max_results = max(1, min(100, int(max_results)))
except Exception:
max_results = 20
days = payload.get('days')
if days is None:
days = 7
try:
days = max(1, min(90, int(days)))
except Exception:
days = 7
cmd = [
'gog', 'calendar', 'events', calendar,
'--account', account,
'--json',
'--no-input',
'--max', str(max_results),
]
from_value = (payload.get('from') or '').strip()
to_value = (payload.get('to') or '').strip()
query = (payload.get('query') or '').strip()
if from_value:
cmd.extend(['--from', from_value])
if to_value:
cmd.extend(['--to', to_value])
if not from_value and not to_value:
cmd.extend(['--days', str(days)])
if query:
cmd.extend(['--query', query])
if payload.get('all_pages') is True:
cmd.append('--all-pages')
if payload.get('fail_empty') is True:
cmd.append('--fail-empty')
if dry_run:
cmd.append('--dry-run')
return cmd
def build_calendar_update_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
calendar = (payload.get('calendar') or 'primary').strip() or 'primary'
event_id = (payload.get('event_id') or payload.get('id') or '').strip()
if not event_id:
fail('calendar_event_update payload missing event_id')
cmd = [
'gog', 'calendar', 'update', calendar, event_id,
'--account', account,
'--json',
'--no-input',
'--send-updates', normalize_send_updates(payload.get('send_updates') or 'none'),
]
if payload.get('title'):
cmd.extend(['--summary', payload['title']])
if payload.get('start'):
cmd.extend(['--from', payload['start']])
if payload.get('end'):
cmd.extend(['--to', payload['end']])
if payload.get('description'):
cmd.extend(['--description', payload['description']])
if payload.get('location'):
cmd.extend(['--location', payload['location']])
attendees = payload.get('attendees')
if isinstance(attendees, list):
cmd.extend(['--attendees', ','.join(str(x) for x in attendees if str(x).strip())])
elif isinstance(attendees, str) and attendees.strip():
cmd.extend(['--attendees', attendees.strip()])
if dry_run:
cmd.append('--dry-run')
return cmd
def build_calendar_delete_command(item: dict, account: str, dry_run: bool):
payload = item.get('payload') or {}
calendar = (payload.get('calendar') or 'primary').strip() or 'primary'
event_id = (payload.get('event_id') or payload.get('id') or '').strip()
if not event_id:
fail('calendar_event_delete payload missing event_id')
cmd = [
'gog', 'calendar', 'delete', calendar, event_id,
'--account', account,
'--json',
'--no-input',
'--force',
'--send-updates', normalize_send_updates(payload.get('send_updates') or 'none'),
]
if dry_run:
cmd.append('--dry-run')
return cmd
def parse_json(output: str):
text = output.strip()
if not text:
return None
return json.loads(text)
def first_string(*values):
for value in values:
if isinstance(value, str) and value.strip():
return value.strip()
return ''
def execution_result_refs(op: str, parsed):
refs = {}
if not isinstance(parsed, dict):
return refs
draft = parsed.get('draft') if isinstance(parsed.get('draft'), dict) else {}
message = parsed.get('message') if isinstance(parsed.get('message'), dict) else {}
event = parsed.get('event') if isinstance(parsed.get('event'), dict) else {}
draft_id = first_string(
parsed.get('draft_id'),
draft.get('id'),
parsed.get('id') if op.startswith('gmail.drafts.') else '',
)
if draft_id:
refs['draft_id'] = draft_id
message_id = first_string(parsed.get('message_id'), message.get('id'))
if message_id:
refs['message_id'] = message_id
event_id = first_string(
parsed.get('event_id'),
event.get('id'),
parsed.get('id') if op.startswith('calendar.') else '',
)
if event_id:
refs['event_id'] = event_id
calendar = first_string(parsed.get('calendar'), parsed.get('calendar_id'), event.get('calendar'))
if calendar:
refs['calendar'] = calendar
return refs
def execution_summary(op: str, status: str, refs: dict, dry_run: bool):
if status == 'failed':
return f'{op} failed'
suffix = 'dry run' if dry_run else status.replace('_', ' ')
ref_parts = []
for key in ('draft_id', 'message_id', 'event_id', 'calendar'):
value = refs.get(key, '')
if value:
ref_parts.append(f'{key}={value}')
if ref_parts:
return f'{op} {suffix} ({", ".join(ref_parts)})'
return f'{op} {suffix}'
def main():
ap = argparse.ArgumentParser(description='Resolve an n8n approval item and execute email/calendar actions via gog.')
ap.add_argument('--id', required=True, help='Approval queue item id')
ap.add_argument('--decision', choices=['approve', 'reject'], default='approve')
ap.add_argument('--account', help='Google account email; otherwise uses GOG_ACCOUNT')
ap.add_argument('--dry-run', action='store_true', help='Use gog --dry-run for host execution')
ap.add_argument('--base-url', default=DEFAULT_BASE_URL)
ap.add_argument('--path', default=DEFAULT_ACTION_PATH)
ap.add_argument('--secret-header', default=DEFAULT_SECRET_HEADER)
args = ap.parse_args()
file_env = load_env_file(DEFAULT_GOG_ENV_FILE)
os.environ.update({k: v for k, v in file_env.items() if k not in os.environ or not os.environ.get(k)})
secret = webhook_secret()
resolved = call_action(
{
'action': 'approval_queue_resolve',
'args': {'id': args.id, 'decision': args.decision, 'note': 'resolved by host gog executor', 'notify_on_resolve': False},
'request_id': f'resolve-{args.id}',
},
base_url=args.base_url,
path=args.path,
secret_header=args.secret_header,
secret=secret,
)
if not resolved.get('ok'):
print(json.dumps(resolved, indent=2))
raise SystemExit(1)
result = (resolved.get('result') or {})
item = result.get('item') or {}
kind = item.get('kind') or ''
if args.decision == 'reject':
print(json.dumps({'resolved': resolved, 'executed': False, 'reason': 'rejected'}, indent=2))
return
if result.get('executed') is True:
print(json.dumps({'resolved': resolved, 'executed': True, 'driver': 'n8n'}, indent=2))
return
executors = {
'email_draft': {
'builder': build_email_draft_create_command,
'op': 'gmail.drafts.create',
'success_status': 'draft_created',
'uses_tmpfile': True,
},
'email_list_drafts': {
'builder': build_email_drafts_list_command,
'op': 'gmail.drafts.list',
'success_status': 'drafts_listed',
'uses_tmpfile': False,
},
'email_draft_delete': {
'builder': build_email_draft_delete_command,
'op': 'gmail.drafts.delete',
'success_status': 'draft_deleted',
'uses_tmpfile': False,
},
'email_draft_send': {
'builder': build_email_draft_send_command,
'op': 'gmail.drafts.send',
'success_status': 'draft_sent',
'uses_tmpfile': False,
},
'calendar_event': {
'builder': build_calendar_create_command,
'op': 'calendar.create',
'success_status': 'event_created',
'uses_tmpfile': False,
},
'calendar_list_events': {
'builder': build_calendar_list_events_command,
'op': 'calendar.events.list',
'success_status': 'events_listed',
'uses_tmpfile': False,
},
'calendar_event_update': {
'builder': build_calendar_update_command,
'op': 'calendar.update',
'success_status': 'event_updated',
'uses_tmpfile': False,
},
'calendar_event_delete': {
'builder': build_calendar_delete_command,
'op': 'calendar.delete',
'success_status': 'event_deleted',
'uses_tmpfile': False,
},
}
spec = executors.get(kind)
if not spec:
print(json.dumps({'resolved': resolved, 'executed': False, 'reason': f'no host executor for kind {kind}'}, indent=2))
return
account = gog_account(args.account)
env = os.environ.copy()
env['GOG_ACCOUNT'] = account
tmpfile = None
if spec['uses_tmpfile']:
cmd, tmpfile = spec['builder'](item, account, args.dry_run)
else:
cmd = spec['builder'](item, account, args.dry_run)
op = spec['op']
success_status = spec['success_status'] if not args.dry_run else 'dry_run'
try:
code, stdout, stderr = run(cmd, env=env)
finally:
if tmpfile:
try:
Path(tmpfile).unlink(missing_ok=True)
except Exception:
pass
if code != 0:
refs = {}
execution = {
'driver': 'gog',
'op': op,
'status': 'failed',
'account': account,
'dry_run': args.dry_run,
'stderr': stderr.strip(),
'stdout': stdout.strip(),
'result_refs': refs,
'summary': execution_summary(op, 'failed', refs, args.dry_run),
}
attach = attach_execution(item['id'], execution, base_url=args.base_url, path=args.path, secret_header=args.secret_header, secret=secret)
print(json.dumps({'resolved': resolved, 'execution': execution, 'attach': attach}, indent=2))
raise SystemExit(code)
parsed = parse_json(stdout) if stdout.strip() else None
refs = execution_result_refs(op, parsed)
execution = {
'driver': 'gog',
'op': op,
'status': success_status,
'account': account,
'dry_run': args.dry_run,
'result_refs': refs,
'summary': execution_summary(op, success_status, refs, args.dry_run),
'result': parsed,
}
attach = attach_execution(item['id'], execution, base_url=args.base_url, path=args.path, secret_header=args.secret_header, secret=secret)
print(json.dumps({'resolved': resolved, 'execution': execution, 'attach': attach}, indent=2))
if __name__ == '__main__':
main()