feat(bridge): TASKS.md inbox drainer + telegram transcript mirror route

- lib/inbox.ts: every 30min (09-20 IST) dispatch the first '- [ ]' item
  under '## INBOX' in TASKS.md: classifyAgent -> spawnTask -> rewrite the
  line with the run id. Bridge-side scheduling: no model tokens burned on
  empty checks, no bearer token embedded in cron prompts.
  Manual trigger: POST /tiger/inbox/drain
- routes/chat-telegram.ts: GET /tiger/chat/telegram reads OpenClaw's native
  telegram session transcript (JSONL) with cursor pagination and mtime
  caching. Replaces the webhook/chat-mirror design, which could never work:
  the bot is owned by OpenClaw's long-polling channel, and Telegram forbids
  webhook + getUpdates on one token, so chat_messages never saw a row.
- index.ts: wire both + start inbox scheduler
This commit is contained in:
Manohar 2026-06-10 02:47:03 +00:00
parent 1a8358bb6a
commit 438488c8c8
3 changed files with 338 additions and 0 deletions

View file

@ -154,7 +154,18 @@ app.use("/tiger/route-task", routeTaskRouter);
app.use("/tiger/keys", keysRouter);
app.use("/tiger/chat", (await import("./routes/chat.js")).default);
app.use("/tiger/chat/mirror", (await import("./routes/chat-mirror.js")).default);
// Telegram mirror v2 — reads OpenClaw's native session transcript directly.
// (chat-mirror + telegram-webhook above are the legacy write-side, kept for
// API compatibility but no longer the data source for the dashboard card.)
app.use("/tiger/chat/telegram", (await import("./routes/chat-telegram.js")).default);
app.use("/tiger/telegram-webhook", (await import("./routes/telegram-webhook.js")).default);
// TASKS.md inbox — manual drain trigger (the scheduler below runs it on its own)
const { drainInboxOnce, startInboxScheduler } = await import("./lib/inbox.js");
app.post("/tiger/inbox/drain", async (_req, res) => {
const result = await drainInboxOnce(true);
res.json({ ok: !result.startsWith("error"), result });
});
app.use("/angel", (await import("./routes/angel/positions.js")).default);
// Gateway proxy — forwards to gateway inside Tiger container
@ -186,6 +197,10 @@ app.listen(PORT, HOST, () => {
// Initialize file watcher for task status updates
initWatcher();
// TASKS.md inbox drainer — dispatches one pending item per cycle to a
// spawned specialist. See lib/inbox.ts for the contract.
startInboxScheduler();
// Start Telegram channel — bridge takes over from OpenClaw native handler.
// Requires channels.telegram.enabled=false in openclaw.json.
const tgChannel = new TelegramChannel();

151
bridge/src/lib/inbox.ts Normal file
View file

@ -0,0 +1,151 @@
/**
* lib/inbox.ts TASKS.md as Tiger's inbox, drained by the bridge
*
* The productivity loop this enables:
* You drop a one-line task into the `## 📥 INBOX` section of TASKS.md
* (from Telegram via Tiger, from the dashboard workspace editor, or by
* hand). Every DRAIN_INTERVAL the bridge picks the FIRST unchecked item,
* asks classifyAgent() which specialist owns it, spawns that specialist
* (lib/agents.ts + routes/spawn.ts), and rewrites the line in place with
* the run id so nothing is picked twice. Completion is reported to
* Telegram by the spawn runner.
*
* Why the BRIDGE schedules this instead of an OpenClaw cron:
* - an OpenClaw cron job is itself an agent turn it would burn a model
* call just to decide whether there is work, every hour
* - the cron prompt would need the bridge bearer token embedded in it
* (a secret inside a prompt bad pattern)
* - the bridge can check TASKS.md for free and only spend model tokens
* (one classify call) when there is actually an item to dispatch
* The existing "Hourly Task Check-in" cron stays it is Tiger's
* *narrative* status report; this is the *mechanical* dispatcher.
*
* INBOX line contract (inside TASKS.md):
* - [ ] research BESS tender pipeline in Gujarat pending
* - [ exec_ab12cd ethan] research BESS tender ... dispatched
* The drainer only ever touches `- [ ]` lines, one per cycle.
*/
import { writeFileSync, unlinkSync } from "fs";
import { exec } from "child_process";
import { promisify } from "util";
import { classifyAgent } from "./llm.js";
import { spawnTask } from "../routes/spawn.js";
const execAsync = promisify(exec);
const DOCKER_CONTAINER = "tiger-openclaw";
const TASKS_PATH = "/home/node/.openclaw/workspace/TASKS.md";
const INBOX_HEADER = "## 📥 INBOX";
/** Check every 30 minutes, only act inside working hours (IST). */
const DRAIN_INTERVAL_MS = 30 * 60 * 1000;
const WORK_HOURS_IST = { start: 9, end: 20 };
const PENDING_LINE = /^- \[ \] (.+)$/;
let draining = false;
function istHour(): number {
return Number(
new Intl.DateTimeFormat("en-GB", {
timeZone: "Asia/Kolkata",
hour: "2-digit",
hour12: false,
}).format(new Date()),
);
}
async function readTasksFile(): Promise<string> {
const { stdout } = await execAsync(
`docker exec ${DOCKER_CONTAINER} cat ${TASKS_PATH}`,
{ timeout: 10_000, maxBuffer: 1024 * 1024 },
);
return stdout;
}
async function writeTasksFile(content: string): Promise<void> {
// docker cp (same escaping-proof transport as spawn/telegram message passing)
const tmp = `/tmp/tasks_inbox_${Date.now()}.md`;
writeFileSync(tmp, content, "utf-8");
try {
await execAsync(`docker cp ${tmp} ${DOCKER_CONTAINER}:${TASKS_PATH}`, {
timeout: 10_000,
});
} finally {
unlinkSync(tmp);
}
}
/**
* One drain cycle: dispatch at most ONE pending inbox item.
* Exported so routes can trigger it manually (POST /tiger/inbox/drain).
* Returns a human-readable outcome for logs/API.
*/
export async function drainInboxOnce(force = false): Promise<string> {
if (draining) return "skipped: drain already in progress";
const hour = istHour();
if (!force && (hour < WORK_HOURS_IST.start || hour >= WORK_HOURS_IST.end)) {
return `skipped: outside work hours (IST hour ${hour})`;
}
draining = true;
try {
let content: string;
try {
content = await readTasksFile();
} catch {
return "skipped: TASKS.md not readable";
}
const lines = content.split("\n");
const headerIdx = lines.findIndex((l) => l.trim().startsWith(INBOX_HEADER));
if (headerIdx === -1) return "skipped: no INBOX section in TASKS.md";
// Scan from the header to the next section header (or EOF).
let target = -1;
let taskText = "";
for (let i = headerIdx + 1; i < lines.length; i++) {
const line = lines[i];
if (line.startsWith("## ")) break; // next section — inbox ended
const m = line.match(PENDING_LINE);
if (m) {
target = i;
taskText = m[1].trim();
break;
}
}
if (target === -1) return "ok: inbox empty";
// Route → spawn → mark, in that order. If classify fails (e.g. the LLM
// gateway is down) we leave the line untouched and retry next cycle.
const { agent: agentId, reason } = await classifyAgent(taskText);
const spawnable = agentId === "tiger" ? "elon" : agentId; // orchestrator work → PM
const ticket = spawnTask({ agentId: spawnable, task: taskText });
lines[target] = `- [⏳ ${ticket.runId}${ticket.agent.id}] ${taskText}`;
await writeTasksFile(lines.join("\n"));
console.log(
`[inbox] dispatched "${taskText.slice(0, 60)}" → ${ticket.agent.id} ` +
`(${ticket.runId}; classifier said ${agentId}: ${reason.slice(0, 80)})`,
);
return `dispatched: ${ticket.runId}${ticket.agent.id}`;
} catch (err) {
const m = err instanceof Error ? err.message : String(err);
console.error("[inbox] drain failed:", m);
return `error: ${m}`;
} finally {
draining = false;
}
}
/** Call once from index.ts at startup. */
export function startInboxScheduler(): void {
setInterval(() => {
void drainInboxOnce();
}, DRAIN_INTERVAL_MS);
console.log(
`[inbox] scheduler started — every ${DRAIN_INTERVAL_MS / 60000}min, ` +
`${WORK_HOURS_IST.start}:00${WORK_HOURS_IST.end}:00 IST`,
);
}

View file

@ -0,0 +1,172 @@
/**
* chat-telegram.ts GET /tiger/chat/telegram : the REAL Telegram mirror
*
* History of this feature (why the old one showed nothing):
* The original design (telegram-webhook.ts + chat-mirror.ts) waited for
* Telegram to POST updates to the bridge. But the bot is handled by
* OpenClaw's NATIVE telegram channel via long-polling, and Telegram's API
* forbids webhook + getUpdates on the same bot token so the webhook was
* never registered, chat_messages never received a single Telegram row,
* and the dashboard card stayed empty. Even if it had worked, it could
* only see inbound messages, never Tiger's replies.
*
* This route reads the conversation from the source of truth instead:
* OpenClaw's session transcript (JSONL) for the telegram:direct session.
* It is the same file Tiger's own context is built from, so the dashboard
* is in perfect sync by construction both directions, full history,
* nothing to register, nothing to double-write.
*
* GET /tiger/chat/telegram?limit=50&before=<seq>
* { ok, sessionKey, messages: [{ seq, role, text, timestamp }], hasMore, oldestSeq }
* - messages are ascending by seq (chronological)
* - `before` pages backwards through history (omit for the newest page)
*
* Transcript line shape (verified live on tiger-config volume):
* { type:"message", timestamp:"2026-06-09T21:08:38.574Z",
* message:{ role:"user"|"assistant"|"toolResult", content:[{type:"text",text}] } }
*/
import { Router, Request, Response } from "express";
import { readFileSync, statSync } from "fs";
import { join } from "path";
const router = Router();
// The bridge runs on the host as root, so it reads the docker volume directly —
// no docker-exec hop on a route that the dashboard polls every few seconds.
const DATA_DIR =
process.env.OPENCLAW_DATA_DIR ||
"/var/lib/docker/volumes/tiger_tiger-config/_data";
const SESSIONS_DIR = join(DATA_DIR, "agents", "main", "sessions");
interface ThreadMessage {
seq: number;
role: "user" | "agent";
text: string;
timestamp: string;
}
interface SessionIndexEntry {
sessionId?: string;
updatedAt?: number;
}
/** Newest telegram session: key + transcript path. */
function resolveTelegramSession(): { key: string; file: string } | null {
let index: Record<string, SessionIndexEntry>;
try {
index = JSON.parse(
readFileSync(join(SESSIONS_DIR, "sessions.json"), "utf-8"),
) as Record<string, SessionIndexEntry>;
} catch {
return null;
}
const candidates = Object.entries(index)
.filter(([key]) => key.includes(":telegram:") || key.includes(":tg_"))
.sort((a, b) => (b[1].updatedAt ?? 0) - (a[1].updatedAt ?? 0));
for (const [key, entry] of candidates) {
if (entry.sessionId) {
return { key, file: join(SESSIONS_DIR, `${entry.sessionId}.jsonl`) };
}
}
return null;
}
// ─── Parse cache ─────────────────────────────────────────────────────────────
// The card polls for new messages; re-parsing the whole transcript each poll
// is wasted work. Cache the parsed array keyed on (path, mtime, size).
let cache: { file: string; mtimeMs: number; size: number; messages: ThreadMessage[] } | null = null;
function parseTranscript(file: string): ThreadMessage[] {
const st = statSync(file);
if (
cache &&
cache.file === file &&
cache.mtimeMs === st.mtimeMs &&
cache.size === st.size
) {
return cache.messages;
}
const messages: ThreadMessage[] = [];
const lines = readFileSync(file, "utf-8").split("\n");
let seq = 0;
for (const line of lines) {
if (!line.trim()) continue;
seq += 1;
let entry: Record<string, any>;
try {
entry = JSON.parse(line);
} catch {
continue;
}
if (entry.type !== "message") continue;
const role = entry.message?.role;
if (role !== "user" && role !== "assistant") continue; // skip toolResult etc.
const content: unknown = entry.message?.content;
let text = "";
if (typeof content === "string") {
text = content;
} else if (Array.isArray(content)) {
text = content
.filter((c) => c && c.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n");
}
text = text.trim();
if (!text) continue; // tool-call-only assistant turns have no text
messages.push({
seq,
role: role === "user" ? "user" : "agent",
text,
timestamp: typeof entry.timestamp === "string" ? entry.timestamp : "",
});
}
cache = { file, mtimeMs: st.mtimeMs, size: st.size, messages };
return messages;
}
router.get("/", (req: Request, res: Response) => {
const limit = Math.min(
Math.max(parseInt(String(req.query.limit ?? "50"), 10) || 50, 1),
200,
);
const before = parseInt(String(req.query.before ?? ""), 10) || null;
const session = resolveTelegramSession();
if (!session) {
return res.status(404).json({
ok: false,
error: "No telegram session found in OpenClaw session index",
});
}
let all: ThreadMessage[];
try {
all = parseTranscript(session.file);
} catch (err) {
const m = err instanceof Error ? err.message : String(err);
return res.status(500).json({ ok: false, error: `transcript read failed: ${m}` });
}
const upTo = before === null ? all : all.filter((m) => m.seq < before);
const page = upTo.slice(-limit);
res.json({
ok: true,
sessionKey: session.key,
messages: page,
hasMore: upTo.length > page.length,
oldestSeq: page.length > 0 ? page[0].seq : null,
});
});
export default router;