diff --git a/bridge/src/index.ts b/bridge/src/index.ts index 644189f..232479c 100644 --- a/bridge/src/index.ts +++ b/bridge/src/index.ts @@ -149,6 +149,8 @@ app.use("/tiger/notify", notifyRouter); app.use("/tiger/dispatch", dispatchRouter); app.use("/tiger/agents", agentsRouter); app.use("/tiger/agents/activity", agentsActivityRouter); +// Complete audit trail (executions + tasks + outputs + cron runs, paginated) +app.use("/tiger/activity/audit", (await import("./routes/activity-audit.js")).default); app.use("/tiger/deploy-dashboard", deployRouter); app.use("/tiger/route-task", routeTaskRouter); app.use("/tiger/keys", keysRouter); diff --git a/bridge/src/routes/activity-audit.ts b/bridge/src/routes/activity-audit.ts new file mode 100644 index 0000000..f4defc3 --- /dev/null +++ b/bridge/src/routes/activity-audit.ts @@ -0,0 +1,232 @@ +/** + * activity-audit.ts — GET /tiger/activity/audit : the complete audit trail + * + * Purpose: ONE chronological, paginated record of everything the system DID, + * so nothing slips by unaudited. Merged sources, each a durable store (the + * old activity feed only showed recent in-memory file events): + * + * executions (sqlite) — every spawn / sub-agent run, with outcome + * tasks (sqlite) — task lifecycle (created / status changes) + * outputs (sqlite) — every artifact an agent wrote + * cron runs (volume) — OpenClaw's JSONL run history for every job + * + * Event shape (normalized): + * { id, ts (ISO), type, actor, summary, status?, ref? } + * type ∈ spawn | task | output | cron + * + * Pagination: ?limit=100&before= walks backwards through history. + * Optional ?types=spawn,cron filters at the source. + * + * Design note: sources are merged at read time rather than double-written + * into a new audit table — no write-path changes, no risk of an action + * happening without its audit row, history is complete retroactively. + */ + +import { Router, Request, Response } from "express"; +import { readFileSync, readdirSync, existsSync } from "fs"; +import { join } from "path"; +import db from "../db.js"; + +const router = Router(); + +const DATA_DIR = + process.env.OPENCLAW_DATA_DIR || + "/var/lib/docker/volumes/tiger_tiger-config/_data"; + +export interface AuditEvent { + id: string; + ts: string; // ISO timestamp + type: "spawn" | "task" | "output" | "cron"; + actor: string; + summary: string; + status?: string; + ref?: string; +} + +// ─── SQLite sources ────────────────────────────────────────────────────────── + +function executionEvents(beforeIso: string | null, limit: number): AuditEvent[] { + const rows = db + .prepare( + `SELECT id, agent, command, exit_code, started_at, completed_at + FROM executions + ${beforeIso ? "WHERE started_at < ?" : ""} + ORDER BY started_at DESC LIMIT ?`, + ) + .all(...(beforeIso ? [beforeIso, limit] : [limit])) as Array<{ + id: string; + agent: string | null; + command: string | null; + exit_code: number | null; + started_at: string; + completed_at: string | null; + }>; + + return rows.map((r) => ({ + id: `exec:${r.id}`, + ts: toIso(r.started_at), + type: "spawn" as const, + actor: r.agent ?? "unknown", + summary: (r.command ?? "").replace(/^spawn:\s*/, "").slice(0, 160), + status: + r.exit_code === null ? "running" : r.exit_code === 0 ? "done" : "error", + ref: r.id, + })); +} + +function taskEvents(beforeIso: string | null, limit: number): AuditEvent[] { + const rows = db + .prepare( + `SELECT id, title, status, assigned_agent, updated_at + FROM tasks + ${beforeIso ? "WHERE updated_at < ?" : ""} + ORDER BY updated_at DESC LIMIT ?`, + ) + .all(...(beforeIso ? [beforeIso, limit] : [limit])) as Array<{ + id: string; + title: string; + status: string; + assigned_agent: string | null; + updated_at: string; + }>; + + return rows.map((r) => ({ + id: `task:${r.id}:${r.updated_at}`, + ts: toIso(r.updated_at), + type: "task" as const, + actor: r.assigned_agent ?? "tiger", + summary: r.title.slice(0, 160), + status: r.status, + ref: r.id, + })); +} + +function outputEvents(beforeIso: string | null, limit: number): AuditEvent[] { + const rows = db + .prepare( + `SELECT id, filename, file_path, execution_id, created_at + FROM outputs + ${beforeIso ? "WHERE created_at < ?" : ""} + ORDER BY created_at DESC LIMIT ?`, + ) + .all(...(beforeIso ? [beforeIso, limit] : [limit])) as Array<{ + id: string; + filename: string; + file_path: string; + execution_id: string | null; + created_at: string; + }>; + + return rows.map((r) => ({ + id: `output:${r.id}`, + ts: toIso(r.created_at), + type: "output" as const, + actor: "agent", + summary: `wrote ${r.filename} (${r.file_path})`.slice(0, 160), + ref: r.execution_id ?? r.id, + })); +} + +// ─── Cron run history (OpenClaw JSONL on the volume) ──────────────────────── +// Cached by directory listing + sizes; cron runs append-only files. + +let cronCache: { stamp: string; events: AuditEvent[] } | null = null; + +function cronEvents(): AuditEvent[] { + const runsDir = join(DATA_DIR, "cron", "runs"); + if (!existsSync(runsDir)) return []; + + let files: string[]; + try { + files = readdirSync(runsDir).filter((f) => f.endsWith(".jsonl")); + } catch { + return []; + } + + // Cheap cache key: file list + sizes via a stat pass would be ideal; the + // run files are small, so name-count + latest mtime via re-read every 30s + // would also be fine. Keep it simple: rebuild when the listing changes. + const stamp = files.join("|"); + if (cronCache && cronCache.stamp === stamp) return cronCache.events; + + const events: AuditEvent[] = []; + for (const file of files) { + let content: string; + try { + content = readFileSync(join(runsDir, file), "utf-8"); + } catch { + continue; + } + for (const line of content.split("\n")) { + if (!line.trim()) continue; + let run: Record; + try { + run = JSON.parse(line); + } catch { + continue; + } + const ts = run.startedAt ?? run.ts ?? run.runAtMs ?? run.timestamp; + if (!ts) continue; + const iso = typeof ts === "number" ? new Date(ts).toISOString() : toIso(String(ts)); + const name = run.jobName ?? run.name ?? file.replace(/\.jsonl$/, ""); + const status = run.status ?? (run.error ? "error" : "ok"); + events.push({ + id: `cron:${file}:${iso}`, + ts: iso, + type: "cron", + actor: "cron", + summary: String(name).slice(0, 160), + status: String(status), + ref: run.jobId ?? file.replace(/\.jsonl$/, ""), + }); + } + } + cronCache = { stamp, events }; + return events; +} + +// ─── Helpers ──────────────────────────────────────────────────────────────── + +/** sqlite datetime('now') yields "YYYY-MM-DD HH:MM:SS" (UTC, no zone) — make it ISO. */ +function toIso(s: string): string { + if (!s) return new Date(0).toISOString(); + if (s.includes("T")) return s; + return s.replace(" ", "T") + "Z"; +} + +// ─── Route ────────────────────────────────────────────────────────────────── + +router.get("/", (req: Request, res: Response) => { + const limit = Math.min( + Math.max(parseInt(String(req.query.limit ?? "100"), 10) || 100, 1), + 500, + ); + const before = req.query.before ? String(req.query.before) : null; + const typeFilter = req.query.types + ? new Set(String(req.query.types).split(",")) + : null; + + const wants = (t: AuditEvent["type"]) => !typeFilter || typeFilter.has(t); + + let events: AuditEvent[] = []; + if (wants("spawn")) events.push(...executionEvents(before, limit)); + if (wants("task")) events.push(...taskEvents(before, limit)); + if (wants("output")) events.push(...outputEvents(before, limit)); + if (wants("cron")) { + let cron = cronEvents(); + if (before) cron = cron.filter((e) => e.ts < before); + events.push(...cron); + } + + events.sort((a, b) => (a.ts < b.ts ? 1 : -1)); // newest first + const page = events.slice(0, limit); + + res.json({ + ok: true, + events: page, + hasMore: events.length > page.length, + oldestTs: page.length > 0 ? page[page.length - 1].ts : null, + }); +}); + +export default router; diff --git a/dashboard/src/app/activity/page.tsx b/dashboard/src/app/activity/page.tsx index 60df479..0068809 100644 --- a/dashboard/src/app/activity/page.tsx +++ b/dashboard/src/app/activity/page.tsx @@ -1,112 +1,192 @@ "use client" -import { useEffect, useState } from "react" -import { ScrollText } from "lucide-react" +/** + * /activity — the complete audit trail. + * + * Every durable action in one timeline: sub-agent spawns, task lifecycle, + * artifacts written, cron runs, file modifications. Type filters + "Load + * older" pagination walk the entire history so nothing escapes audit. + */ + +import { useCallback, useEffect, useState } from "react" +import { ScrollText, ChevronDown } from "lucide-react" interface ActivityEntry { id: string + ts: string type: string - timestamp: string - description: string - source: string + actor: string + summary: string + status?: string +} + +const TYPES = [ + { id: "spawn", label: "Spawns" }, + { id: "cron", label: "Cron" }, + { id: "task", label: "Tasks" }, + { id: "output", label: "Outputs" }, + { id: "file", label: "Files" }, +] + +const TYPE_COLORS: Record = { + spawn: "text-violet-400 border-violet-400/30", + cron: "text-sky-400 border-sky-400/30", + task: "text-amber-400 border-amber-400/30", + output: "text-emerald-400 border-emerald-400/30", + file: "text-muted-foreground border-border", +} + +const STATUS_COLORS: Record = { + done: "text-emerald-400", + ok: "text-emerald-400", + running: "text-sky-400", + error: "text-red-400", } export default function ActivityPage() { const [entries, setEntries] = useState([]) + const [active, setActive] = useState>(new Set(TYPES.map((t) => t.id))) + const [hasMore, setHasMore] = useState(false) const [loading, setLoading] = useState(true) + const [loadingMore, setLoadingMore] = useState(false) const [error, setError] = useState(null) + const fetchPage = useCallback( + async (before?: string) => { + const qs = new URLSearchParams({ limit: "100" }) + if (before) qs.set("before", before) + if (active.size < TYPES.length) qs.set("types", Array.from(active).join(",")) + const r = await fetch(`/api/activity?${qs.toString()}`) + return r.json() as Promise<{ + entries?: ActivityEntry[] + hasMore?: boolean + oldestTs?: string | null + error?: string + }> + }, + [active], + ) + + // (Re)load whenever filters change useEffect(() => { - fetch("/api/activity?limit=20") - .then(r => r.json()) - .then(data => { - if (data?.entries) { + setLoading(true) + setError(null) + fetchPage() + .then((data) => { + if (data.entries) { setEntries(data.entries) - } - setLoading(false) + setHasMore(Boolean(data.hasMore)) + } else setError(data.error || "No data") }) - .catch(e => { - console.error("Failed to load:", e) - setError(e.message) - setLoading(false) - }) - }, []) + .catch((e: Error) => setError(e.message)) + .finally(() => setLoading(false)) + }, [fetchPage]) - const formatDate = (ts: string) => { - if (!ts) return "" - return new Date(ts).toLocaleString() - } - - const getTypeColor = (type: string) => { - switch (type) { - case "heartbeat": return "text-green-500" - case "chat": return "text-blue-500" - case "config": return "text-yellow-500" - case "memory": return "text-purple-500" - case "system": return "text-orange-500" - case "cron": return "text-cyan-500" - default: return "text-muted-foreground" + const loadOlder = async () => { + if (loadingMore || entries.length === 0) return + setLoadingMore(true) + try { + const data = await fetchPage(entries[entries.length - 1].ts) + if (data.entries && data.entries.length > 0) { + setEntries((prev) => { + const known = new Set(prev.map((e) => e.id)) + return [...prev, ...data.entries!.filter((e) => !known.has(e.id))] + }) + setHasMore(Boolean(data.hasMore)) + } else setHasMore(false) + } finally { + setLoadingMore(false) } } - const getSourceLabel = (source: string) => { - switch (source) { - case "main": return "🐅 Tiger" - case "coder": return "📦 Cody" - case "researcher": return "🔬 Ethan" - case "pm": return "📋 Elon" - default: return source || "🤖" - } - } + const toggle = (id: string) => + setActive((prev) => { + const next = new Set(prev) + if (next.has(id)) { + if (next.size > 1) next.delete(id) // never filter down to nothing + } else next.add(id) + return next + }) - if (loading) { - return ( -
-
- -

Activity

-
-
Loading activity log...
-
- ) - } - - if (error) { - return ( -
-
- -

Activity

-
-
Failed to load activity log
-
{error}
-
- ) - } + const fmt = (ts: string) => + new Date(ts).toLocaleString([], { + day: "2-digit", + month: "short", + hour: "2-digit", + minute: "2-digit", + }) return ( -
-
- -

Activity

- ({entries.length} entries) +
+
+ +

Activity

+

+ Complete audit trail — spawns, cron runs, tasks, outputs, file changes. +

-
- {entries.map((entry, i) => ( -
-
{getSourceLabel(entry.source)}
-
-
{entry.description}
-
- {entry.type} - - {formatDate(entry.timestamp)} -
-
-
+
+ {TYPES.map((t) => ( + ))}
+ + {loading &&
Loading…
} + {error &&
Error: {error}
} + + {!loading && !error && ( +
+ {entries.length === 0 && ( +
No events.
+ )} + {entries.map((e) => ( +
+ + {fmt(e.ts)} + + + {e.type} + + + {e.actor}{" "} + {e.summary} + {e.status && ( + + {e.status} + + )} + +
+ ))} + + {hasMore && ( + + )} +
+ )}
) -} \ No newline at end of file +} diff --git a/dashboard/src/app/api/activity/route.ts b/dashboard/src/app/api/activity/route.ts index 716f201..3eb46d6 100644 --- a/dashboard/src/app/api/activity/route.ts +++ b/dashboard/src/app/api/activity/route.ts @@ -1,44 +1,82 @@ +/** + * /api/activity — unified audit feed for the Activity page. + * + * Merges two bridge sources: + * /tiger/activity/audit durable history: spawns, tasks, outputs, cron + * runs — paginated, complete + * /tiger/agents/activity recent file-modification events (in-memory, + * recent-only by nature; merged for first page) + * + * ?limit=100&before=&types=spawn,cron,task,output,file + */ + import { NextResponse } from "next/server" import { bridgeGet } from "@/lib/bridge" export const dynamic = "force-dynamic" +interface AuditEvent { + id: string + ts: string + type: string + actor: string + summary: string + status?: string + ref?: string +} + export async function GET(request: Request) { + const url = new URL(request.url) + const limit = Math.min(parseInt(url.searchParams.get("limit") || "100", 10), 500) + const before = url.searchParams.get("before") || "" + const types = url.searchParams.get("types") || "" + try { - const url = new URL(request.url) - const limit = parseInt(url.searchParams.get("limit") || "50", 10) + const qs = new URLSearchParams({ limit: String(limit) }) + if (before) qs.set("before", before) + if (types) qs.set("types", types.split(",").filter((t) => t !== "file").join(",")) - // Get activity from bridge endpoint that already works - const bridgeData = await bridgeGet("/tiger/agents/activity") as { + const audit = (await bridgeGet(`/tiger/activity/audit?${qs.toString()}`)) as { ok: boolean - events: Array<{ - agentId: string - agentName: string - agentEmoji: string - path: string - action: string - ts: number - }> + events?: AuditEvent[] + hasMore?: boolean + oldestTs?: string | null } - if (!bridgeData?.ok || !bridgeData.events) { - return NextResponse.json({ entries: [], total: 0 }) + let events: AuditEvent[] = audit?.events ?? [] + + // File events only exist for the recent window — merge them into the + // first page (no `before` cursor) when not filtered out. + const wantFiles = !types || types.split(",").includes("file") + if (!before && wantFiles) { + try { + const fileData = (await bridgeGet("/tiger/agents/activity")) as { + ok: boolean + events?: Array<{ agentId: string; agentName: string; path: string; action: string; ts: number }> + } + if (fileData?.ok && fileData.events) { + events = events.concat( + fileData.events.map((e) => ({ + id: `file:${e.agentId}:${e.ts}`, + ts: new Date(e.ts).toISOString(), + type: "file", + actor: e.agentName, + summary: `${e.action || "modified"} ${e.path}`, + })), + ) + } + } catch { /* file source down — audit sources still serve */ } } - // Transform bridge format to activity format - const entries = bridgeData.events.slice(0, limit).map((e) => ({ - id: `${e.agentId}-${e.ts}`, - type: "system", - timestamp: new Date(e.ts).toISOString(), - description: `${e.agentName} modified ${e.path}`, - source: e.agentId, - })) + events.sort((a, b) => (a.ts < b.ts ? 1 : -1)) + const page = events.slice(0, limit) return NextResponse.json({ - entries, - total: bridgeData.events.length, + entries: page, + hasMore: Boolean(audit?.hasMore) || events.length > page.length, + oldestTs: page.length > 0 ? page[page.length - 1].ts : null, }) - } catch (err) { + } catch { return NextResponse.json({ error: "Failed to fetch activity" }, { status: 500 }) } -} \ No newline at end of file +}