From 438488c8c8ff4778f62c1247ee1297bc8831c971 Mon Sep 17 00:00:00 2001 From: Manohar Date: Wed, 10 Jun 2026 02:47:03 +0000 Subject: [PATCH] feat(bridge): TASKS.md inbox drainer + telegram transcript mirror route - lib/inbox.ts: every 30min (09-20 IST) dispatch the first '- [ ]' item under '## INBOX' in TASKS.md: classifyAgent -> spawnTask -> rewrite the line with the run id. Bridge-side scheduling: no model tokens burned on empty checks, no bearer token embedded in cron prompts. Manual trigger: POST /tiger/inbox/drain - routes/chat-telegram.ts: GET /tiger/chat/telegram reads OpenClaw's native telegram session transcript (JSONL) with cursor pagination and mtime caching. Replaces the webhook/chat-mirror design, which could never work: the bot is owned by OpenClaw's long-polling channel, and Telegram forbids webhook + getUpdates on one token, so chat_messages never saw a row. - index.ts: wire both + start inbox scheduler --- bridge/src/index.ts | 15 +++ bridge/src/lib/inbox.ts | 151 +++++++++++++++++++++++++ bridge/src/routes/chat-telegram.ts | 172 +++++++++++++++++++++++++++++ 3 files changed, 338 insertions(+) create mode 100644 bridge/src/lib/inbox.ts create mode 100644 bridge/src/routes/chat-telegram.ts diff --git a/bridge/src/index.ts b/bridge/src/index.ts index a103946..644189f 100644 --- a/bridge/src/index.ts +++ b/bridge/src/index.ts @@ -154,7 +154,18 @@ app.use("/tiger/route-task", routeTaskRouter); app.use("/tiger/keys", keysRouter); app.use("/tiger/chat", (await import("./routes/chat.js")).default); app.use("/tiger/chat/mirror", (await import("./routes/chat-mirror.js")).default); +// Telegram mirror v2 — reads OpenClaw's native session transcript directly. +// (chat-mirror + telegram-webhook above are the legacy write-side, kept for +// API compatibility but no longer the data source for the dashboard card.) +app.use("/tiger/chat/telegram", (await import("./routes/chat-telegram.js")).default); app.use("/tiger/telegram-webhook", (await import("./routes/telegram-webhook.js")).default); + +// TASKS.md inbox — manual drain trigger (the scheduler below runs it on its own) +const { drainInboxOnce, startInboxScheduler } = await import("./lib/inbox.js"); +app.post("/tiger/inbox/drain", async (_req, res) => { + const result = await drainInboxOnce(true); + res.json({ ok: !result.startsWith("error"), result }); +}); app.use("/angel", (await import("./routes/angel/positions.js")).default); // Gateway proxy — forwards to gateway inside Tiger container @@ -186,6 +197,10 @@ app.listen(PORT, HOST, () => { // Initialize file watcher for task status updates initWatcher(); + // TASKS.md inbox drainer — dispatches one pending item per cycle to a + // spawned specialist. See lib/inbox.ts for the contract. + startInboxScheduler(); + // Start Telegram channel — bridge takes over from OpenClaw native handler. // Requires channels.telegram.enabled=false in openclaw.json. const tgChannel = new TelegramChannel(); diff --git a/bridge/src/lib/inbox.ts b/bridge/src/lib/inbox.ts new file mode 100644 index 0000000..6455238 --- /dev/null +++ b/bridge/src/lib/inbox.ts @@ -0,0 +1,151 @@ +/** + * lib/inbox.ts — TASKS.md as Tiger's inbox, drained by the bridge + * + * The productivity loop this enables: + * You drop a one-line task into the `## 📥 INBOX` section of TASKS.md + * (from Telegram via Tiger, from the dashboard workspace editor, or by + * hand). Every DRAIN_INTERVAL the bridge picks the FIRST unchecked item, + * asks classifyAgent() which specialist owns it, spawns that specialist + * (lib/agents.ts + routes/spawn.ts), and rewrites the line in place with + * the run id so nothing is picked twice. Completion is reported to + * Telegram by the spawn runner. + * + * Why the BRIDGE schedules this instead of an OpenClaw cron: + * - an OpenClaw cron job is itself an agent turn → it would burn a model + * call just to decide whether there is work, every hour + * - the cron prompt would need the bridge bearer token embedded in it + * (a secret inside a prompt — bad pattern) + * - the bridge can check TASKS.md for free and only spend model tokens + * (one classify call) when there is actually an item to dispatch + * The existing "Hourly Task Check-in" cron stays — it is Tiger's + * *narrative* status report; this is the *mechanical* dispatcher. + * + * INBOX line contract (inside TASKS.md): + * - [ ] research BESS tender pipeline in Gujarat ← pending + * - [⏳ exec_ab12cd → ethan] research BESS tender ... ← dispatched + * The drainer only ever touches `- [ ]` lines, one per cycle. + */ + +import { writeFileSync, unlinkSync } from "fs"; +import { exec } from "child_process"; +import { promisify } from "util"; +import { classifyAgent } from "./llm.js"; +import { spawnTask } from "../routes/spawn.js"; + +const execAsync = promisify(exec); + +const DOCKER_CONTAINER = "tiger-openclaw"; +const TASKS_PATH = "/home/node/.openclaw/workspace/TASKS.md"; +const INBOX_HEADER = "## 📥 INBOX"; +/** Check every 30 minutes, only act inside working hours (IST). */ +const DRAIN_INTERVAL_MS = 30 * 60 * 1000; +const WORK_HOURS_IST = { start: 9, end: 20 }; + +const PENDING_LINE = /^- \[ \] (.+)$/; + +let draining = false; + +function istHour(): number { + return Number( + new Intl.DateTimeFormat("en-GB", { + timeZone: "Asia/Kolkata", + hour: "2-digit", + hour12: false, + }).format(new Date()), + ); +} + +async function readTasksFile(): Promise { + const { stdout } = await execAsync( + `docker exec ${DOCKER_CONTAINER} cat ${TASKS_PATH}`, + { timeout: 10_000, maxBuffer: 1024 * 1024 }, + ); + return stdout; +} + +async function writeTasksFile(content: string): Promise { + // docker cp (same escaping-proof transport as spawn/telegram message passing) + const tmp = `/tmp/tasks_inbox_${Date.now()}.md`; + writeFileSync(tmp, content, "utf-8"); + try { + await execAsync(`docker cp ${tmp} ${DOCKER_CONTAINER}:${TASKS_PATH}`, { + timeout: 10_000, + }); + } finally { + unlinkSync(tmp); + } +} + +/** + * One drain cycle: dispatch at most ONE pending inbox item. + * Exported so routes can trigger it manually (POST /tiger/inbox/drain). + * Returns a human-readable outcome for logs/API. + */ +export async function drainInboxOnce(force = false): Promise { + if (draining) return "skipped: drain already in progress"; + const hour = istHour(); + if (!force && (hour < WORK_HOURS_IST.start || hour >= WORK_HOURS_IST.end)) { + return `skipped: outside work hours (IST hour ${hour})`; + } + + draining = true; + try { + let content: string; + try { + content = await readTasksFile(); + } catch { + return "skipped: TASKS.md not readable"; + } + + const lines = content.split("\n"); + const headerIdx = lines.findIndex((l) => l.trim().startsWith(INBOX_HEADER)); + if (headerIdx === -1) return "skipped: no INBOX section in TASKS.md"; + + // Scan from the header to the next section header (or EOF). + let target = -1; + let taskText = ""; + for (let i = headerIdx + 1; i < lines.length; i++) { + const line = lines[i]; + if (line.startsWith("## ")) break; // next section — inbox ended + const m = line.match(PENDING_LINE); + if (m) { + target = i; + taskText = m[1].trim(); + break; + } + } + if (target === -1) return "ok: inbox empty"; + + // Route → spawn → mark, in that order. If classify fails (e.g. the LLM + // gateway is down) we leave the line untouched and retry next cycle. + const { agent: agentId, reason } = await classifyAgent(taskText); + const spawnable = agentId === "tiger" ? "elon" : agentId; // orchestrator work → PM + const ticket = spawnTask({ agentId: spawnable, task: taskText }); + + lines[target] = `- [⏳ ${ticket.runId} → ${ticket.agent.id}] ${taskText}`; + await writeTasksFile(lines.join("\n")); + + console.log( + `[inbox] dispatched "${taskText.slice(0, 60)}" → ${ticket.agent.id} ` + + `(${ticket.runId}; classifier said ${agentId}: ${reason.slice(0, 80)})`, + ); + return `dispatched: ${ticket.runId} → ${ticket.agent.id}`; + } catch (err) { + const m = err instanceof Error ? err.message : String(err); + console.error("[inbox] drain failed:", m); + return `error: ${m}`; + } finally { + draining = false; + } +} + +/** Call once from index.ts at startup. */ +export function startInboxScheduler(): void { + setInterval(() => { + void drainInboxOnce(); + }, DRAIN_INTERVAL_MS); + console.log( + `[inbox] scheduler started — every ${DRAIN_INTERVAL_MS / 60000}min, ` + + `${WORK_HOURS_IST.start}:00–${WORK_HOURS_IST.end}:00 IST`, + ); +} diff --git a/bridge/src/routes/chat-telegram.ts b/bridge/src/routes/chat-telegram.ts new file mode 100644 index 0000000..c51f424 --- /dev/null +++ b/bridge/src/routes/chat-telegram.ts @@ -0,0 +1,172 @@ +/** + * chat-telegram.ts — GET /tiger/chat/telegram : the REAL Telegram mirror + * + * History of this feature (why the old one showed nothing): + * The original design (telegram-webhook.ts + chat-mirror.ts) waited for + * Telegram to POST updates to the bridge. But the bot is handled by + * OpenClaw's NATIVE telegram channel via long-polling, and Telegram's API + * forbids webhook + getUpdates on the same bot token — so the webhook was + * never registered, chat_messages never received a single Telegram row, + * and the dashboard card stayed empty. Even if it had worked, it could + * only see inbound messages, never Tiger's replies. + * + * This route reads the conversation from the source of truth instead: + * OpenClaw's session transcript (JSONL) for the telegram:direct session. + * It is the same file Tiger's own context is built from, so the dashboard + * is in perfect sync by construction — both directions, full history, + * nothing to register, nothing to double-write. + * + * GET /tiger/chat/telegram?limit=50&before= + * → { ok, sessionKey, messages: [{ seq, role, text, timestamp }], hasMore, oldestSeq } + * - messages are ascending by seq (chronological) + * - `before` pages backwards through history (omit for the newest page) + * + * Transcript line shape (verified live on tiger-config volume): + * { type:"message", timestamp:"2026-06-09T21:08:38.574Z", + * message:{ role:"user"|"assistant"|"toolResult", content:[{type:"text",text}] } } + */ + +import { Router, Request, Response } from "express"; +import { readFileSync, statSync } from "fs"; +import { join } from "path"; + +const router = Router(); + +// The bridge runs on the host as root, so it reads the docker volume directly — +// no docker-exec hop on a route that the dashboard polls every few seconds. +const DATA_DIR = + process.env.OPENCLAW_DATA_DIR || + "/var/lib/docker/volumes/tiger_tiger-config/_data"; +const SESSIONS_DIR = join(DATA_DIR, "agents", "main", "sessions"); + +interface ThreadMessage { + seq: number; + role: "user" | "agent"; + text: string; + timestamp: string; +} + +interface SessionIndexEntry { + sessionId?: string; + updatedAt?: number; +} + +/** Newest telegram session: key + transcript path. */ +function resolveTelegramSession(): { key: string; file: string } | null { + let index: Record; + try { + index = JSON.parse( + readFileSync(join(SESSIONS_DIR, "sessions.json"), "utf-8"), + ) as Record; + } catch { + return null; + } + + const candidates = Object.entries(index) + .filter(([key]) => key.includes(":telegram:") || key.includes(":tg_")) + .sort((a, b) => (b[1].updatedAt ?? 0) - (a[1].updatedAt ?? 0)); + + for (const [key, entry] of candidates) { + if (entry.sessionId) { + return { key, file: join(SESSIONS_DIR, `${entry.sessionId}.jsonl`) }; + } + } + return null; +} + +// ─── Parse cache ───────────────────────────────────────────────────────────── +// The card polls for new messages; re-parsing the whole transcript each poll +// is wasted work. Cache the parsed array keyed on (path, mtime, size). + +let cache: { file: string; mtimeMs: number; size: number; messages: ThreadMessage[] } | null = null; + +function parseTranscript(file: string): ThreadMessage[] { + const st = statSync(file); + if ( + cache && + cache.file === file && + cache.mtimeMs === st.mtimeMs && + cache.size === st.size + ) { + return cache.messages; + } + + const messages: ThreadMessage[] = []; + const lines = readFileSync(file, "utf-8").split("\n"); + let seq = 0; + + for (const line of lines) { + if (!line.trim()) continue; + seq += 1; + let entry: Record; + try { + entry = JSON.parse(line); + } catch { + continue; + } + if (entry.type !== "message") continue; + + const role = entry.message?.role; + if (role !== "user" && role !== "assistant") continue; // skip toolResult etc. + + const content: unknown = entry.message?.content; + let text = ""; + if (typeof content === "string") { + text = content; + } else if (Array.isArray(content)) { + text = content + .filter((c) => c && c.type === "text" && typeof c.text === "string") + .map((c) => c.text) + .join("\n"); + } + text = text.trim(); + if (!text) continue; // tool-call-only assistant turns have no text + + messages.push({ + seq, + role: role === "user" ? "user" : "agent", + text, + timestamp: typeof entry.timestamp === "string" ? entry.timestamp : "", + }); + } + + cache = { file, mtimeMs: st.mtimeMs, size: st.size, messages }; + return messages; +} + +router.get("/", (req: Request, res: Response) => { + const limit = Math.min( + Math.max(parseInt(String(req.query.limit ?? "50"), 10) || 50, 1), + 200, + ); + const before = parseInt(String(req.query.before ?? ""), 10) || null; + + const session = resolveTelegramSession(); + if (!session) { + return res.status(404).json({ + ok: false, + error: "No telegram session found in OpenClaw session index", + }); + } + + let all: ThreadMessage[]; + try { + all = parseTranscript(session.file); + } catch (err) { + const m = err instanceof Error ? err.message : String(err); + return res.status(500).json({ ok: false, error: `transcript read failed: ${m}` }); + } + + const upTo = before === null ? all : all.filter((m) => m.seq < before); + const page = upTo.slice(-limit); + + res.json({ + ok: true, + sessionKey: session.key, + messages: page, + hasMore: upTo.length > page.length, + oldestSeq: page.length > 0 ? page[0].seq : null, + }); +}); + +export default router;