Add morning-report and stock-lookup skills

Add comprehensive morning report skill with collectors for calendar, email, tasks,
infrastructure status, news, stocks, and weather. Add stock lookup skill for quote queries.
This commit is contained in:
OpenCode Test
2026-01-03 10:54:54 -08:00
parent ae958528a6
commit daa4de8832
13 changed files with 1590 additions and 0 deletions

View File

@@ -0,0 +1,54 @@
---
name: morning-report
description: Generate daily morning dashboard with email, calendar, stocks, weather, tasks, infrastructure status, and news
---
# Morning Report Skill
Aggregates useful information into a single Markdown dashboard.
## Usage
Generate report:
```bash
~/.claude/skills/morning-report/scripts/generate.py
```
Or use the `/morning` command.
## Output
- **Location:** `~/.claude/reports/morning.md`
- **Archive:** `~/.claude/reports/archive/YYYY-MM-DD.md`
## Sections
| Section | Source | LLM Tier |
|---------|--------|----------|
| Weather | wttr.in | Haiku |
| Email | Gmail API | Sonnet |
| Calendar | Google Calendar API | None |
| Stocks | Yahoo Finance | Haiku |
| Tasks | Google Tasks API | None |
| Infrastructure | k8s + sysadmin skills | Haiku |
| News | RSS feeds | Sonnet |
## Configuration
Edit `~/.claude/skills/morning-report/config.json` to customize:
- Stock watchlist
- Weather location
- RSS feeds
- Display limits
## Scheduling
Systemd timer runs at 8:00 AM Pacific daily.
```bash
# Check timer status
systemctl --user status morning-report.timer
# View logs
journalctl --user -u morning-report
```

View File

@@ -0,0 +1,43 @@
{
"version": "1.0",
"schedule": {
"time": "08:00",
"timezone": "America/Los_Angeles"
},
"output": {
"path": "~/.claude/reports/morning.md",
"archive": true,
"archive_days": 30
},
"stocks": {
"watchlist": ["CRWV", "NVDA", "MSFT"],
"show_trend": true
},
"weather": {
"location": "Seattle,WA,USA",
"provider": "wttr.in"
},
"email": {
"max_display": 5,
"triage": true
},
"calendar": {
"show_tomorrow": true
},
"tasks": {
"max_display": 5,
"show_due_dates": true
},
"infra": {
"check_k8s": true,
"check_workstation": true,
"detail_level": "traffic_light"
},
"news": {
"feeds": [
{"name": "Hacker News", "url": "https://hnrss.org/frontpage", "limit": 3},
{"name": "Lobsters", "url": "https://lobste.rs/rss", "limit": 2}
],
"summarize": true
}
}

View File

@@ -0,0 +1 @@
# Morning report collectors

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""Calendar collector using existing gcal skill."""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
def fetch_events(mode: str = "today") -> list:
"""Fetch calendar events directly using gmail_mcp library."""
os.environ.setdefault('GMAIL_CREDENTIALS_PATH', os.path.expanduser('~/.gmail-mcp/credentials.json'))
try:
# Add gmail venv to path
venv_site = Path.home() / ".claude/mcp/gmail/venv/lib/python3.13/site-packages"
if str(venv_site) not in sys.path:
sys.path.insert(0, str(venv_site))
from gmail_mcp.utils.GCP.gmail_auth import get_calendar_service
service = get_calendar_service()
now = datetime.utcnow()
if mode == 'today':
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
elif mode == 'tomorrow':
start = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
else:
start = now
end = now + timedelta(days=7)
events_result = service.events().list(
calendarId='primary',
timeMin=start.isoformat() + 'Z',
timeMax=end.isoformat() + 'Z',
singleEvents=True,
orderBy='startTime',
maxResults=20
).execute()
return events_result.get('items', [])
except Exception as e:
return [{"error": str(e)}]
def format_events(today_events: list, tomorrow_events: list = None) -> str:
"""Format calendar events - no LLM needed, structured data."""
lines = []
# Today's events
if today_events and (len(today_events) == 0 or "error" not in today_events[0]):
if not today_events:
lines.append("No events today")
else:
for event in today_events:
start = event.get("start", {})
time_str = ""
if "dateTime" in start:
# Timed event
dt = datetime.fromisoformat(start["dateTime"].replace("Z", "+00:00"))
time_str = dt.strftime("%I:%M %p").lstrip("0")
elif "date" in start:
time_str = "All day"
summary = event.get("summary", "(No title)")
duration = ""
# Calculate duration if end time available
end = event.get("end", {})
if "dateTime" in start and "dateTime" in end:
start_dt = datetime.fromisoformat(start["dateTime"].replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end["dateTime"].replace("Z", "+00:00"))
mins = int((end_dt - start_dt).total_seconds() / 60)
if mins >= 60:
hours = mins // 60
remaining = mins % 60
duration = f" ({hours}h{remaining}m)" if remaining else f" ({hours}h)"
else:
duration = f" ({mins}m)"
lines.append(f"{time_str} - {summary}{duration}")
elif today_events and "error" in today_events[0]:
error = today_events[0].get("error", "Unknown")
lines.append(f"⚠️ Could not fetch calendar: {error}")
else:
lines.append("No events today")
# Tomorrow preview
if tomorrow_events is not None:
if tomorrow_events and (len(tomorrow_events) == 0 or "error" not in tomorrow_events[0]):
count = len(tomorrow_events)
if count > 0:
first = tomorrow_events[0]
start = first.get("start", {})
if "dateTime" in start:
dt = datetime.fromisoformat(start["dateTime"].replace("Z", "+00:00"))
first_time = dt.strftime("%I:%M %p").lstrip("0")
else:
first_time = "All day"
lines.append(f"Tomorrow: {count} event{'s' if count > 1 else ''}, first at {first_time}")
else:
lines.append("Tomorrow: No events")
return "\n".join(lines) if lines else "No calendar data"
def collect(config: dict) -> dict:
"""Main collector entry point."""
cal_config = config.get("calendar", {})
show_tomorrow = cal_config.get("show_tomorrow", True)
today_events = fetch_events("today")
tomorrow_events = fetch_events("tomorrow") if show_tomorrow else None
formatted = format_events(today_events, tomorrow_events)
has_error = today_events and len(today_events) == 1 and "error" in today_events[0]
return {
"section": "Today",
"icon": "📅",
"content": formatted,
"raw": {"today": today_events, "tomorrow": tomorrow_events},
"error": today_events[0].get("error") if has_error else None
}
if __name__ == "__main__":
config = {"calendar": {"show_tomorrow": True}}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""Gmail collector using existing gmail skill."""
import os
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
def fetch_unread_emails(days: int = 7, max_results: int = 15) -> list:
"""Fetch unread emails directly using gmail_mcp library."""
# Set credentials path
os.environ.setdefault('GMAIL_CREDENTIALS_PATH', os.path.expanduser('~/.gmail-mcp/credentials.json'))
try:
# Add gmail venv to path
venv_site = Path.home() / ".claude/mcp/gmail/venv/lib/python3.13/site-packages"
if str(venv_site) not in sys.path:
sys.path.insert(0, str(venv_site))
from gmail_mcp.utils.GCP.gmail_auth import get_gmail_service
service = get_gmail_service()
results = service.users().messages().list(
userId='me',
q=f'is:unread newer_than:{days}d',
maxResults=max_results
).execute()
emails = []
for msg in results.get('messages', []):
detail = service.users().messages().get(
userId='me',
id=msg['id'],
format='metadata',
metadataHeaders=['From', 'Subject']
).execute()
headers = {h['name']: h['value'] for h in detail['payload']['headers']}
emails.append({
'from': headers.get('From', 'Unknown'),
'subject': headers.get('Subject', '(no subject)'),
'id': msg['id']
})
return emails
except Exception as e:
return [{"error": str(e)}]
def triage_with_sonnet(emails: list) -> str:
"""Use Sonnet to triage and summarize emails."""
if not emails or (len(emails) == 1 and "error" in emails[0]):
error = emails[0].get("error", "Unknown error") if emails else "No data"
return f"⚠️ Could not fetch emails: {error}"
# Build email summary for Sonnet
email_text = []
for i, e in enumerate(emails[:10], 1):
sender = e.get("from", "Unknown").split("<")[0].strip().strip('"')
subject = e.get("subject", "(no subject)")[:80]
email_text.append(f"{i}. From: {sender}\n Subject: {subject}")
email_context = "\n\n".join(email_text)
prompt = f"""You are triaging emails for a morning report. Given these unread emails, provide a brief summary.
Format:
- First line: count and any urgent items (e.g., "5 unread, 1 urgent")
- Then list top emails with [!] for urgent, or plain bullet
- Keep each email to one line: sender - subject snippet (max 50 chars)
- Maximum 5 emails shown
Emails:
{email_context}
Output the formatted email section, nothing else."""
try:
result = subprocess.run(
["/home/will/.local/bin/claude", "--print", "--model", "sonnet", "-p", prompt],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception:
pass
# Fallback to basic format
lines = [f"{len(emails)} unread"]
for e in emails[:5]:
sender = e.get("from", "Unknown").split("<")[0].strip().strip('"')[:20]
subject = e.get("subject", "(no subject)")[:40]
lines.append(f"{sender} - {subject}")
return "\n".join(lines)
def collect(config: dict) -> dict:
"""Main collector entry point."""
email_config = config.get("email", {})
max_display = email_config.get("max_display", 5)
use_triage = email_config.get("triage", True)
emails = fetch_unread_emails(days=7, max_results=max_display + 10)
if use_triage and emails and "error" not in emails[0]:
formatted = triage_with_sonnet(emails)
else:
# Basic format or error
if emails and "error" not in emails[0]:
lines = [f"{len(emails)} unread"]
for e in emails[:max_display]:
sender = e.get("from", "Unknown").split("<")[0].strip().strip('"')[:20]
subject = e.get("subject", "(no subject)")[:40]
lines.append(f"{sender} - {subject}")
formatted = "\n".join(lines)
else:
error = emails[0].get("error", "Unknown") if emails else "No data"
formatted = f"⚠️ Could not fetch emails: {error}"
has_error = emails and len(emails) == 1 and "error" in emails[0]
return {
"section": "Email",
"icon": "📧",
"content": formatted,
"raw": emails if not has_error else None,
"count": len(emails) if not has_error else 0,
"error": emails[0].get("error") if has_error else None
}
if __name__ == "__main__":
config = {"email": {"max_display": 5, "triage": True}}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""Google Tasks collector."""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
# Add gmail venv to path for Google API libraries
venv_site = Path.home() / ".claude/mcp/gmail/venv/lib/python3.13/site-packages"
if str(venv_site) not in sys.path:
sys.path.insert(0, str(venv_site))
# Google Tasks API
try:
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
GOOGLE_API_AVAILABLE = True
except ImportError:
GOOGLE_API_AVAILABLE = False
SCOPES = ["https://www.googleapis.com/auth/tasks.readonly"]
TOKEN_PATH = Path.home() / ".gmail-mcp/tasks_token.json"
CREDS_PATH = Path.home() / ".gmail-mcp/credentials.json"
def get_credentials():
"""Get or refresh Google credentials for Tasks API."""
creds = None
if TOKEN_PATH.exists():
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not CREDS_PATH.exists():
return None
flow = InstalledAppFlow.from_client_secrets_file(str(CREDS_PATH), SCOPES)
creds = flow.run_local_server(port=0)
TOKEN_PATH.write_text(creds.to_json())
return creds
def fetch_tasks(max_results: int = 10) -> list:
"""Fetch tasks from Google Tasks API."""
if not GOOGLE_API_AVAILABLE:
return [{"error": "Google API libraries not installed"}]
try:
creds = get_credentials()
if not creds:
return [{"error": "Tasks API not authenticated - run: ~/.claude/mcp/gmail/venv/bin/python ~/.claude/skills/morning-report/scripts/collectors/gtasks.py --auth"}]
service = build("tasks", "v1", credentials=creds)
# Get default task list
tasklists = service.tasklists().list(maxResults=1).execute()
if not tasklists.get("items"):
return []
tasklist_id = tasklists["items"][0]["id"]
# Get tasks
results = service.tasks().list(
tasklist=tasklist_id,
maxResults=max_results,
showCompleted=False,
showHidden=False
).execute()
tasks = results.get("items", [])
return tasks
except Exception as e:
return [{"error": str(e)}]
def format_tasks(tasks: list, max_display: int = 5) -> str:
"""Format tasks - no LLM needed, structured data."""
if not tasks:
return "No pending tasks"
if len(tasks) == 1 and "error" in tasks[0]:
return f"⚠️ Could not fetch tasks: {tasks[0]['error']}"
lines = []
# Count and header
total = len(tasks)
due_today = 0
today_str = datetime.now().strftime("%Y-%m-%d")
for task in tasks:
due = task.get("due", "")
if due and due.startswith(today_str):
due_today += 1
header = f"{total} pending"
if due_today > 0:
header += f", {due_today} due today"
lines.append(header)
# List tasks
for task in tasks[:max_display]:
title = task.get("title", "(No title)")
due = task.get("due", "")
due_str = ""
if due:
try:
due_date = datetime.fromisoformat(due.replace("Z", "+00:00"))
if due_date.date() == datetime.now().date():
due_str = " (due today)"
elif due_date.date() < datetime.now().date():
due_str = " (overdue!)"
else:
due_str = f" (due {due_date.strftime('%b %d')})"
except ValueError:
pass
lines.append(f"{title}{due_str}")
if total > max_display:
lines.append(f" ... and {total - max_display} more")
return "\n".join(lines)
def collect(config: dict) -> dict:
"""Main collector entry point."""
tasks_config = config.get("tasks", {})
max_display = tasks_config.get("max_display", 5)
tasks = fetch_tasks(max_display + 5)
formatted = format_tasks(tasks, max_display)
has_error = tasks and len(tasks) == 1 and "error" in tasks[0]
return {
"section": "Tasks",
"icon": "",
"content": formatted,
"raw": tasks if not has_error else None,
"count": len(tasks) if not has_error else 0,
"error": tasks[0].get("error") if has_error else None
}
if __name__ == "__main__":
import sys
if "--auth" in sys.argv:
print("Starting Tasks API authentication...")
creds = get_credentials()
if creds:
print(f"✅ Authentication successful! Token saved to {TOKEN_PATH}")
else:
print("❌ Authentication failed")
sys.exit(0)
config = {"tasks": {"max_display": 5}}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""Infrastructure collector for K8s and workstation health."""
import subprocess
from pathlib import Path
def check_k8s_health() -> dict:
"""Check Kubernetes cluster health."""
try:
# Quick node check
result = subprocess.run(
["kubectl", "get", "nodes", "-o", "jsonpath={.items[*].status.conditions[-1].type}"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode != 0:
return {"status": "unknown", "error": "kubectl failed"}
# Check if all nodes are Ready
conditions = result.stdout.strip().split()
all_ready = all(c == "Ready" for c in conditions) if conditions else False
# Quick pod check for issues
pod_result = subprocess.run(
["kubectl", "get", "pods", "-A", "--field-selector=status.phase!=Running,status.phase!=Succeeded",
"-o", "jsonpath={.items[*].metadata.name}"],
capture_output=True,
text=True,
timeout=15
)
problem_pods = pod_result.stdout.strip().split() if pod_result.stdout.strip() else []
if all_ready and len(problem_pods) == 0:
return {"status": "green", "message": "All nodes ready, no problem pods"}
elif all_ready:
return {"status": "yellow", "message": f"{len(problem_pods)} pods not running"}
else:
return {"status": "red", "message": "Node(s) not ready"}
except subprocess.TimeoutExpired:
return {"status": "unknown", "error": "timeout"}
except Exception as e:
return {"status": "unknown", "error": str(e)}
def check_workstation_health() -> dict:
"""Check local workstation health."""
try:
issues = []
# Disk usage
result = subprocess.run(
["df", "-h", "/"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
lines = result.stdout.strip().split("\n")
if len(lines) > 1:
parts = lines[1].split()
if len(parts) >= 5:
usage = int(parts[4].rstrip("%"))
if usage > 90:
issues.append(f"disk {usage}%")
elif usage > 80:
issues.append(f"disk {usage}%")
# Memory usage
result = subprocess.run(
["free", "-m"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
lines = result.stdout.strip().split("\n")
if len(lines) > 1:
parts = lines[1].split()
if len(parts) >= 3:
total = int(parts[1])
used = int(parts[2])
pct = (used / total) * 100 if total > 0 else 0
if pct > 90:
issues.append(f"mem {pct:.0f}%")
# Load average
result = subprocess.run(
["cat", "/proc/loadavg"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
load_1m = float(result.stdout.split()[0])
# Get CPU count
cpu_result = subprocess.run(["nproc"], capture_output=True, text=True, timeout=5)
cpus = int(cpu_result.stdout.strip()) if cpu_result.returncode == 0 else 4
if load_1m > cpus * 2:
issues.append(f"load {load_1m:.1f}")
if not issues:
return {"status": "green", "message": "OK"}
elif len(issues) <= 1 and "disk 8" in str(issues):
return {"status": "yellow", "message": ", ".join(issues)}
else:
return {"status": "red" if len(issues) > 1 else "yellow", "message": ", ".join(issues)}
except Exception as e:
return {"status": "unknown", "error": str(e)}
def format_status(k8s: dict, workstation: dict) -> str:
"""Format infrastructure status with traffic lights."""
status_icons = {
"green": "🟢",
"yellow": "🟡",
"red": "🔴",
"unknown": ""
}
k8s_icon = status_icons.get(k8s.get("status", "unknown"), "")
ws_icon = status_icons.get(workstation.get("status", "unknown"), "")
k8s_detail = k8s.get("error", k8s.get("message", ""))
ws_detail = workstation.get("error", workstation.get("message", ""))
# Keep it simple for traffic light mode
parts = [f"K8s: {k8s_icon}", f"Workstation: {ws_icon}"]
# Add details only if not green
details = []
if k8s.get("status") != "green" and k8s_detail:
details.append(f"K8s: {k8s_detail}")
if workstation.get("status") != "green" and ws_detail:
details.append(f"WS: {ws_detail}")
result = " | ".join(parts)
if details:
result += f"\n{'; '.join(details)}"
return result
def collect(config: dict) -> dict:
"""Main collector entry point."""
infra_config = config.get("infra", {})
k8s_result = {"status": "unknown", "message": "disabled"}
ws_result = {"status": "unknown", "message": "disabled"}
if infra_config.get("check_k8s", True):
k8s_result = check_k8s_health()
if infra_config.get("check_workstation", True):
ws_result = check_workstation_health()
formatted = format_status(k8s_result, ws_result)
# Determine overall status
statuses = [k8s_result.get("status"), ws_result.get("status")]
if "red" in statuses:
overall = "red"
elif "yellow" in statuses or "unknown" in statuses:
overall = "yellow"
else:
overall = "green"
return {
"section": "Infrastructure",
"icon": "🖥",
"content": formatted,
"raw": {"k8s": k8s_result, "workstation": ws_result},
"status": overall,
"error": None
}
if __name__ == "__main__":
config = {"infra": {"check_k8s": True, "check_workstation": True}}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""News collector for RSS feeds."""
import json
import subprocess
import xml.etree.ElementTree as ET
import urllib.request
from html import unescape
from typing import Optional
def fetch_feed(url: str, limit: int = 5) -> list:
"""Fetch and parse RSS feed."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
root = ET.fromstring(content)
items = []
# Handle both RSS and Atom formats
for item in root.findall(".//item")[:limit] or root.findall(".//{http://www.w3.org/2005/Atom}entry")[:limit]:
title = item.findtext("title") or item.findtext("{http://www.w3.org/2005/Atom}title") or ""
link = item.findtext("link") or ""
# For Atom, link might be an attribute
if not link:
link_elem = item.find("{http://www.w3.org/2005/Atom}link")
if link_elem is not None:
link = link_elem.get("href", "")
# Try to get score/points from description or comments
description = item.findtext("description") or ""
comments = item.findtext("comments") or ""
# Hacker News includes points in description
points = ""
if "points" in description.lower():
import re
match = re.search(r"(\d+)\s*points?", description, re.I)
if match:
points = match.group(1)
items.append({
"title": unescape(title.strip()),
"link": link,
"points": points
})
return items
except Exception as e:
return [{"error": str(e)}]
def summarize_with_sonnet(all_items: list, feed_names: list) -> str:
"""Use Sonnet to summarize news headlines."""
if not all_items or all(len(items) == 1 and "error" in items[0] for items in all_items):
return "⚠️ Could not fetch news feeds"
# Build context
news_text = []
for i, (items, name) in enumerate(zip(all_items, feed_names)):
if items and "error" not in items[0]:
for item in items:
points_str = f" ({item['points']} pts)" if item.get("points") else ""
news_text.append(f"[{name}] {item['title']}{points_str}")
if not news_text:
return "No news available"
context = "\n".join(news_text)
prompt = f"""You are creating a tech news section for a morning report.
Given these headlines from various sources, pick the top 5 most interesting/important ones.
Format each as a bullet with source in parentheses.
Keep titles concise - trim if needed.
Headlines:
{context}
Output ONLY the formatted news list, nothing else."""
try:
result = subprocess.run(
["claude", "--print", "--model", "sonnet", "-p", prompt],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception:
pass
# Fallback - just show first few items
lines = []
for items, name in zip(all_items, feed_names):
if items and "error" not in items[0]:
for item in items[:2]:
points_str = f" ({item['points']} pts)" if item.get("points") else ""
title = item["title"][:60] + "..." if len(item["title"]) > 60 else item["title"]
lines.append(f"{title}{points_str} ({name})")
return "\n".join(lines[:5]) if lines else "No news available"
def collect(config: dict) -> dict:
"""Main collector entry point."""
news_config = config.get("news", {})
feeds = news_config.get("feeds", [
{"name": "Hacker News", "url": "https://hnrss.org/frontpage", "limit": 5},
{"name": "Lobsters", "url": "https://lobste.rs/rss", "limit": 3}
])
use_summarize = news_config.get("summarize", True)
all_items = []
feed_names = []
errors = []
for feed in feeds:
items = fetch_feed(feed["url"], feed.get("limit", 5))
all_items.append(items)
feed_names.append(feed["name"])
if items and len(items) == 1 and "error" in items[0]:
errors.append(f"{feed['name']}: {items[0]['error']}")
if use_summarize:
formatted = summarize_with_sonnet(all_items, feed_names)
else:
# Basic format
lines = []
for items, name in zip(all_items, feed_names):
if items and "error" not in items[0]:
for item in items[:3]:
title = item["title"][:50]
points = f" ({item['points']})" if item.get("points") else ""
lines.append(f"{title}{points} - {name}")
formatted = "\n".join(lines) if lines else "No news available"
return {
"section": "Tech News",
"icon": "📰",
"content": formatted,
"raw": {name: items for name, items in zip(feed_names, all_items)},
"error": errors[0] if errors else None
}
if __name__ == "__main__":
config = {
"news": {
"feeds": [
{"name": "Hacker News", "url": "https://hnrss.org/frontpage", "limit": 3},
{"name": "Lobsters", "url": "https://lobste.rs/rss", "limit": 2}
],
"summarize": True
}
}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Stocks collector using stock-lookup skill."""
import json
import subprocess
import sys
from pathlib import Path
def get_quotes(symbols: list) -> list:
"""Fetch quotes using stock-lookup skill."""
script = Path.home() / ".claude/skills/stock-lookup/scripts/quote.py"
try:
result = subprocess.run(
[sys.executable, str(script), "--json"] + symbols,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return json.loads(result.stdout)
else:
return [{"symbol": s, "error": "fetch failed"} for s in symbols]
except Exception as e:
return [{"symbol": s, "error": str(e)} for s in symbols]
def format_stocks_with_haiku(quotes: list) -> str:
"""Use Haiku to format stock data nicely."""
# Build context
lines = []
for q in quotes:
if "error" in q:
lines.append(f"{q.get('symbol', '?')}: error - {q['error']}")
else:
price = q.get("price", 0)
prev = q.get("previous_close", price)
if prev and prev > 0:
change = ((price - prev) / prev) * 100
direction = "+" if change >= 0 else ""
lines.append(f"{q['symbol']}: ${price:.2f} ({direction}{change:.1f}%)")
else:
lines.append(f"{q['symbol']}: ${price:.2f}")
stock_data = "\n".join(lines)
prompt = f"""Format these stock quotes into a compact single line for a morning dashboard.
Use arrow indicators (▲▼) for direction. Keep it concise.
{stock_data}
Output ONLY the formatted stock line, nothing else."""
try:
result = subprocess.run(
["claude", "--print", "--model", "haiku", "-p", prompt],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception:
pass
# Fallback to basic format
parts = []
for q in quotes:
if "error" in q:
parts.append(f"{q.get('symbol', '?')} ⚠️")
else:
price = q.get("price", 0)
prev = q.get("previous_close", price)
if prev and prev > 0:
change = ((price - prev) / prev) * 100
arrow = "" if change >= 0 else ""
parts.append(f"{q['symbol']} ${price:.2f} {'+' if change >= 0 else ''}{change:.1f}% {arrow}")
else:
parts.append(f"{q['symbol']} ${price:.2f}")
return " ".join(parts)
def collect(config: dict) -> dict:
"""Main collector entry point."""
watchlist = config.get("stocks", {}).get("watchlist", ["NVDA", "AAPL", "MSFT"])
quotes = get_quotes(watchlist)
formatted = format_stocks_with_haiku(quotes)
errors = [q.get("error") for q in quotes if "error" in q]
return {
"section": "Stocks",
"icon": "📈",
"content": formatted,
"raw": quotes,
"error": errors[0] if errors else None
}
if __name__ == "__main__":
config = {"stocks": {"watchlist": ["CRWV", "NVDA", "MSFT"]}}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Weather collector using wttr.in."""
import json
import subprocess
import urllib.request
from pathlib import Path
def fetch_weather(location: str) -> dict:
"""Fetch weather data from wttr.in."""
# Use wttr.in JSON format
url = f"https://wttr.in/{location}?format=j1"
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
try:
with urllib.request.urlopen(req, timeout=5) as resp:
return json.load(resp)
except Exception as e:
return {"error": str(e)}
def format_weather_basic(data: dict, location: str) -> str:
"""Format weather data without LLM - basic fallback."""
if "error" in data:
return f"Weather unavailable: {data['error']}"
try:
current = data["current_condition"][0]
today = data["weather"][0]
temp_f = current.get("temp_F", "?")
desc = current.get("weatherDesc", [{}])[0].get("value", "Unknown")
high = today.get("maxtempF", "?")
low = today.get("mintempF", "?")
return f"{location}: {temp_f}°F, {desc} | High {high}° Low {low}°"
except Exception as e:
return f"Weather parse error: {e}"
def format_weather_with_haiku(data: dict, location: str) -> str:
"""Use Haiku to format weather data nicely."""
if "error" in data:
return f"Weather unavailable: {data['error']}"
try:
current = data["current_condition"][0]
today = data["weather"][0]
# Extract key data
temp_f = current.get("temp_F", "?")
feels_like = current.get("FeelsLikeF", temp_f)
desc = current.get("weatherDesc", [{}])[0].get("value", "Unknown")
humidity = current.get("humidity", "?")
high = today.get("maxtempF", "?")
low = today.get("mintempF", "?")
# Check for precipitation
hourly = today.get("hourly", [])
rain_hours = [h for h in hourly if int(h.get("chanceofrain", 0)) > 50]
# Build context for Haiku
weather_context = f"""Current: {temp_f}°F (feels like {feels_like}°F), {desc}
High: {high}°F, Low: {low}°F
Humidity: {humidity}%
Rain chance >50%: {len(rain_hours)} hours today"""
# Check if claude is available
claude_path = Path.home() / ".local/bin/claude"
if not claude_path.exists():
claude_path = "claude" # Try PATH
prompt = f"""Format this weather data for {location} into a single concise line for a morning report.
Add a brief hint if relevant (e.g., "bring umbrella", "nice day for a walk").
Keep it under 80 characters if possible.
{weather_context}
Output ONLY the formatted weather line, nothing else."""
result = subprocess.run(
[str(claude_path), "--print", "--model", "haiku", "-p", prompt],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
else:
# Fallback to basic format
return format_weather_basic(data, location)
except Exception as e:
# Fallback to basic format
return format_weather_basic(data, location)
def collect(config: dict) -> dict:
"""Main collector entry point."""
location = config.get("weather", {}).get("location", "Seattle,WA,USA")
city_name = location.split(",")[0]
data = fetch_weather(location)
# Try Haiku formatting, fall back to basic
formatted = format_weather_with_haiku(data, city_name)
return {
"section": "Weather",
"icon": "🌤",
"content": formatted,
"raw": data if "error" not in data else None,
"error": data.get("error")
}
if __name__ == "__main__":
# Test
config = {"weather": {"location": "Seattle,WA,USA"}}
result = collect(config)
print(f"## {result['icon']} {result['section']}")
print(result["content"])

View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""Morning report orchestrator - generates the daily dashboard."""
import json
import logging
import os
import sys
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
# Add collectors to path
sys.path.insert(0, str(Path(__file__).parent))
from collectors import weather, stocks, infra, news
# These may fail if gmail venv not activated
try:
from collectors import gmail, gcal, gtasks
GOOGLE_COLLECTORS = True
except ImportError:
GOOGLE_COLLECTORS = False
# Setup logging
LOG_PATH = Path.home() / ".claude/logs/morning-report.log"
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(LOG_PATH),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def load_config() -> dict:
"""Load configuration from config.json."""
config_path = Path(__file__).parent.parent / "config.json"
if config_path.exists():
return json.loads(config_path.read_text())
return {}
def collect_section(name: str, collector_func, config: dict) -> dict:
"""Run a collector and handle errors."""
try:
logger.info(f"Collecting {name}...")
result = collector_func(config)
logger.info(f"Collected {name}: OK")
return result
except Exception as e:
logger.error(f"Collector {name} failed: {e}")
return {
"section": name,
"icon": "",
"content": f"⚠️ {name} unavailable: {e}",
"error": str(e)
}
def collect_all(config: dict) -> list:
"""Collect all sections in parallel."""
collectors = [
("Weather", weather.collect),
("Stocks", stocks.collect),
("Infra", infra.collect),
("News", news.collect),
]
if GOOGLE_COLLECTORS:
collectors.extend([
("Email", gmail.collect),
("Calendar", gcal.collect),
("Tasks", gtasks.collect),
])
else:
logger.warning("Google collectors not available - run with gmail venv")
results = []
with ThreadPoolExecutor(max_workers=6) as executor:
futures = {
executor.submit(collect_section, name, func, config): name
for name, func in collectors
}
for future in as_completed(futures):
name = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Future {name} exception: {e}")
results.append({
"section": name,
"icon": "",
"content": f"⚠️ {name} failed: {e}",
"error": str(e)
})
return results
def render_report(sections: list, config: dict) -> str:
"""Render the markdown report."""
now = datetime.now()
date_str = now.strftime("%a %b %d, %Y")
time_str = now.strftime("%I:%M %p %Z").strip()
lines = [
f"# Morning Report - {date_str}",
""
]
# Order sections
order = ["Weather", "Email", "Calendar", "Today", "Stocks", "Tasks", "Infra", "Infrastructure", "News", "Tech News"]
# Sort by order
section_map = {s.get("section", ""): s for s in sections}
for name in order:
if name in section_map:
s = section_map[name]
lines.append(f"## {s.get('icon', '📌')} {s.get('section', 'Unknown')}")
lines.append(s.get("content", "No data"))
lines.append("")
# Add any unordered sections
for s in sections:
if s.get("section") not in order:
lines.append(f"## {s.get('icon', '📌')} {s.get('section', 'Unknown')}")
lines.append(s.get("content", "No data"))
lines.append("")
# Footer
lines.extend([
"---",
f"*Generated: {now.strftime('%Y-%m-%d %H:%M:%S')} PT*"
])
return "\n".join(lines)
def save_report(content: str, config: dict) -> Path:
"""Save report to file and archive."""
output_config = config.get("output", {})
output_path = Path(output_config.get("path", "~/.claude/reports/morning.md")).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
# Write main report
output_path.write_text(content)
logger.info(f"Report saved to {output_path}")
# Archive if enabled
if output_config.get("archive", True):
archive_dir = output_path.parent / "archive"
archive_dir.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y-%m-%d")
archive_path = archive_dir / f"{date_str}.md"
shutil.copy(output_path, archive_path)
logger.info(f"Archived to {archive_path}")
# Cleanup old archives
archive_days = output_config.get("archive_days", 30)
cleanup_archives(archive_dir, archive_days)
return output_path
def cleanup_archives(archive_dir: Path, max_days: int):
"""Remove archives older than max_days."""
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=max_days)
for f in archive_dir.glob("*.md"):
try:
# Parse date from filename
date_str = f.stem
file_date = datetime.strptime(date_str, "%Y-%m-%d")
if file_date < cutoff:
f.unlink()
logger.info(f"Removed old archive: {f}")
except ValueError:
pass # Skip files that don't match date pattern
def main():
"""Main entry point."""
logger.info("=" * 50)
logger.info("Starting morning report generation")
config = load_config()
logger.info(f"Loaded config: {len(config)} sections")
sections = collect_all(config)
logger.info(f"Collected {len(sections)} sections")
report = render_report(sections, config)
output_path = save_report(report, config)
print(f"\n✅ Morning report generated: {output_path}")
print(f" View with: cat {output_path}")
logger.info("Morning report generation complete")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,66 @@
---
name: stock-lookup
description: Look up stock prices and quotes
---
# Stock Lookup Skill
Fetch real-time stock quotes and trends from Yahoo Finance.
## Usage
```bash
~/.claude/skills/stock-lookup/scripts/quote.py SYMBOL [SYMBOL...]
~/.claude/skills/stock-lookup/scripts/quote.py SYMBOL --trend [RANGE]
```
## Examples
Single stock:
```bash
~/.claude/skills/stock-lookup/scripts/quote.py CRWV
```
Multiple stocks:
```bash
~/.claude/skills/stock-lookup/scripts/quote.py AAPL MSFT GOOGL NVDA
```
3-month trend (default):
```bash
~/.claude/skills/stock-lookup/scripts/quote.py CRWV --trend
```
1-year trend:
```bash
~/.claude/skills/stock-lookup/scripts/quote.py NVDA --trend 1y
```
JSON output:
```bash
~/.claude/skills/stock-lookup/scripts/quote.py --json CRWV
~/.claude/skills/stock-lookup/scripts/quote.py --json --trend 6mo CRWV
```
## Options
| Option | Description |
|--------|-------------|
| `--trend [RANGE]` | Show trend with sparkline. Default: 3mo |
| `--json` | Output as JSON |
## Trend Ranges
`1mo`, `3mo`, `6mo`, `1y`, `2y`, `5y`, `ytd`, `max`
## Output
**Quote mode:** Symbol, name, price, daily change, market state
**Trend mode:** Start/end prices, change, high/low, ASCII sparkline
## Notes
- Uses Yahoo Finance unofficial API (no key required)
- Prices may be delayed 15-20 minutes for some exchanges
- Works for stocks, ETFs, indices (^GSPC, ^DJI), crypto (BTC-USD)

View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""Fetch stock quotes from Yahoo Finance API."""
import argparse
import json
import urllib.request
from datetime import datetime
def fetch_chart(symbol: str, range_: str = "1d", interval: str = "1d") -> dict:
"""Fetch chart data for a symbol."""
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol.upper()}?interval={interval}&range={range_}"
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.load(resp)
except urllib.error.HTTPError as e:
return {"error": f"HTTP {e.code}: Symbol '{symbol}' not found"}
except urllib.error.URLError as e:
return {"error": f"Network error: {e.reason}"}
def get_quote(symbol: str) -> dict:
"""Fetch quote data for a symbol."""
data = fetch_chart(symbol)
if "error" in data:
return data
try:
result = data["chart"]["result"][0]
meta = result["meta"]
return {
"symbol": meta["symbol"],
"name": meta.get("shortName", meta.get("longName", "N/A")),
"price": meta["regularMarketPrice"],
"previous_close": meta.get("chartPreviousClose", meta.get("previousClose")),
"currency": meta.get("currency", "USD"),
"exchange": meta.get("exchangeName", "N/A"),
"market_state": meta.get("marketState", "N/A"),
}
except (KeyError, IndexError, TypeError) as e:
return {"error": f"Parse error: {e}"}
def get_trend(symbol: str, range_: str = "3mo") -> dict:
"""Fetch trend data for a symbol over a time range."""
data = fetch_chart(symbol, range_=range_, interval="1d")
if "error" in data:
return data
try:
result = data["chart"]["result"][0]
meta = result["meta"]
timestamps = result["timestamp"]
closes = result["indicators"]["quote"][0]["close"]
# Filter valid data points
valid_data = [(t, c) for t, c in zip(timestamps, closes) if c is not None]
if not valid_data:
return {"error": "No price data available"}
first_ts, first_price = valid_data[0]
last_ts, last_price = valid_data[-1]
prices_only = [c for _, c in valid_data]
high = max(prices_only)
low = min(prices_only)
change = last_price - first_price
pct_change = (change / first_price) * 100
return {
"symbol": meta["symbol"],
"name": meta.get("shortName", meta.get("longName", "N/A")),
"range": range_,
"start_date": datetime.fromtimestamp(first_ts).strftime("%b %d"),
"end_date": datetime.fromtimestamp(last_ts).strftime("%b %d"),
"start_price": first_price,
"end_price": last_price,
"change": change,
"pct_change": pct_change,
"high": high,
"low": low,
"prices": prices_only,
}
except (KeyError, IndexError, TypeError) as e:
return {"error": f"Parse error: {e}"}
def format_quote(q: dict) -> str:
"""Format quote for display."""
if "error" in q:
return f"Error: {q['error']}"
price = q["price"]
prev = q.get("previous_close")
if prev:
change = price - prev
pct = (change / prev) * 100
direction = "+" if change >= 0 else ""
change_str = f" ({direction}{change:.2f}, {direction}{pct:.2f}%)"
else:
change_str = ""
market = f" [{q['market_state']}]" if q.get("market_state") else ""
return f"{q['symbol']} ({q['name']}): ${price:.2f}{change_str}{market}"
def format_trend(t: dict) -> str:
"""Format trend data for display."""
if "error" in t:
return f"Error: {t['error']}"
direction = "+" if t["change"] >= 0 else ""
# Build sparkline
prices = t["prices"]
weekly = prices[::5] + [prices[-1]] # Sample every 5 trading days
min_p, max_p = min(weekly), max(weekly)
range_p = max_p - min_p if max_p > min_p else 1
bars = "▁▂▃▄▅▆▇█"
sparkline = ""
for p in weekly:
idx = int((p - min_p) / range_p * 7.99)
idx = min(7, max(0, idx))
sparkline += bars[idx]
return f"""{t['symbol']} ({t['name']}) - {t['range']} Trend
{'=' * 42}
Start ({t['start_date']}): ${t['start_price']:.2f}
Now ({t['end_date']}): ${t['end_price']:.2f}
Change: {direction}{t['change']:.2f} ({direction}{t['pct_change']:.1f}%)
High: ${t['high']:.2f}
Low: ${t['low']:.2f}
${min_p:.0f} {sparkline} ${max_p:.0f}"""
def main():
parser = argparse.ArgumentParser(description="Fetch stock quotes")
parser.add_argument("symbols", nargs="+", help="Stock symbol(s) to look up")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument(
"--trend",
nargs="?",
const="3mo",
metavar="RANGE",
help="Show trend (default: 3mo). Ranges: 1mo, 3mo, 6mo, 1y, 2y, 5y, ytd, max",
)
args = parser.parse_args()
if args.trend:
results = [get_trend(s, args.trend) for s in args.symbols]
formatter = format_trend
else:
results = [get_quote(s) for s in args.symbols]
formatter = format_quote
if args.json:
# Remove prices array from JSON output (too verbose)
for r in results:
if "prices" in r:
del r["prices"]
print(json.dumps(results, indent=2))
else:
for i, r in enumerate(results):
if i > 0:
print()
print(formatter(r))
if __name__ == "__main__":
main()