#!/usr/bin/env python3 """ watcher.py -- Umami visitor watcher + Telegram command bot. Sending: new visitor alerts -> notifier.py -> Apprise -> Telegram Receiving: /stats /today /live /help commands -> direct Telegram API (Apprise can't receive) Run: python3 watcher.py Keep alive: managed by systemd (see watcher.service) """ import time import requests from datetime import datetime, timezone import config from notifier import notify # ── State (in-memory, resets on restart) ───────────────────────────────────── seen_sessions = set() _umami_auth_token = None _umami_token_time = 0 last_telegram_update_id = 0 # ── Umami auth ──────────────────────────────────────────────────────────────── def get_umami_token(): """Get a cached Umami JWT, refreshing every 30 minutes.""" global _umami_auth_token, _umami_token_time now = time.time() if _umami_auth_token and (now - _umami_token_time) < 1800: return _umami_auth_token try: r = requests.post( f"{config.UMAMI_URL}/api/auth/login", json={"username": config.UMAMI_USER, "password": config.UMAMI_PASS}, timeout=10, ) _umami_auth_token = r.json()["token"] _umami_token_time = now return _umami_auth_token except Exception as e: print(f"[ERROR] Umami auth failed: {e}") return None def umami_get(endpoint, params=None): """Authenticated GET to Umami API.""" token = get_umami_token() if not token: return None try: r = requests.get( f"{config.UMAMI_URL}/api/websites/{config.UMAMI_SITE_ID}/{endpoint}", params=params or {}, headers={"Authorization": f"Bearer {token}"}, timeout=10, ) return r.json() except Exception as e: print(f"[ERROR] Umami API failed: {e}") return None def get_recent_sessions(minutes=5): now = int(time.time()) * 1000 start = now - (minutes * 60 * 1000) data = umami_get("sessions", {"startAt": start, "endAt": now}) return data.get("data", []) if data else [] # ── Notification formatting ─────────────────────────────────────────────────── def format_visitor(session): device_map = { "mobile": "\U0001f4f1", "tablet": "\U0001f4f1", "laptop": "\U0001f4bb", "desktop": "\U0001f5a5\ufe0f" } emoji = device_map.get(session.get("device", ""), "\U0001f310") city = session.get("city", "Unknown") country = session.get("country", "??") browser = session.get("browser", "Unknown").title() os_name = session.get("os", "Unknown") device = session.get("device", "Unknown").title() views = session.get("views", 1) first_at = session.get("firstAt") or session.get("createdAt") or "" try: dt = datetime.fromisoformat(first_at.replace("Z", "+00:00")) time_str = dt.astimezone(config.IST).strftime("%I:%M %p IST") except Exception: time_str = "now" return "\n".join([ f"{emoji} New Visitor!", f"\U0001f4cd {city}, {country}", f"\U0001f5a5\ufe0f {browser} / {os_name} / {device}", f"\U0001f4c4 {views} page(s) viewed", f"\U0001f550 {time_str}", ]) # ── Telegram command handlers ───────────────────────────────────────────────── # These use notify() for sending, but must poll Telegram directly for receiving. def cmd_live(chat_id): data = umami_get("active") if not data: notify("\u26a0\ufe0f Could not fetch active visitors.") return count = data.get("visitors", 0) if count == 0: notify("\U0001f634 No active visitors right now.") return sessions = get_recent_sessions(minutes=5) lines = [f"\U0001f7e2 {count} active visitor(s)\n"] for s in sessions[:10]: city = s.get("city", "?") country = s.get("country", "?") device = s.get("device", "?").title() views = s.get("views", 1) first_at = s.get("firstAt") or s.get("createdAt") or "" try: dt = datetime.fromisoformat(first_at.replace("Z", "+00:00")) t = dt.astimezone(config.IST).strftime("%I:%M %p") except Exception: t = "?" lines.append(f" \U0001f4cd {city}, {country} | {device} | {views} pages | {t} IST") notify("\n".join(lines)) def cmd_stats(chat_id): now_ms = int(time.time()) * 1000 today_start = int(datetime.now(timezone.utc).replace( hour=0, minute=0, second=0, microsecond=0 ).timestamp()) * 1000 stats = umami_get("stats", {"startAt": today_start, "endAt": now_ms}) active = umami_get("active") if not stats: notify("\u26a0\ufe0f Could not fetch stats.") return pv = stats.get("pageviews", {}).get("value", 0) visitors = stats.get("visitors", {}).get("value", 0) visits = stats.get("visits", {}).get("value", 0) bounce = stats.get("bounces", {}).get("value", 0) total_time = stats.get("totaltime", {}).get("value", 0) avg_time = round(total_time / visits, 1) if visits else 0 live = active.get("visitors", 0) if active else 0 notify("\n".join([ "\U0001f4ca Today's Stats", f"\U0001f465 Visitors: {visitors}", f"\U0001f440 Pageviews: {pv}", f"\U0001f6aa Visits: {visits}", f"\U0001f3c3 Bounces: {bounce}", f"\u23f1\ufe0f Avg time: {avg_time}s", f"\U0001f7e2 Live now: {live}", ])) def cmd_today(chat_id): now_ms = int(time.time()) * 1000 today_start = int(datetime.now(timezone.utc).replace( hour=0, minute=0, second=0, microsecond=0 ).timestamp()) * 1000 pages = umami_get("metrics", {"startAt": today_start, "endAt": now_ms, "type": "url"}) referrers = umami_get("metrics", {"startAt": today_start, "endAt": now_ms, "type": "referrer"}) lines = ["\U0001f4c4 Today's Top Pages\n"] for p in (pages or [])[:8]: lines.append(f" {p.get('x','?')} -- {p.get('y',0)} views") if not pages: lines.append(" No data yet") lines.append("\n\U0001f517 Top Referrers\n") for r in (referrers or [])[:5]: ref = r.get("x") or "(direct)" lines.append(f" {ref} -- {r.get('y',0)}") if not referrers: lines.append(" No referrer data yet") notify("\n".join(lines)) def cmd_help(chat_id): notify("\n".join([ "\U0001f916 Portfolio Watcher Commands", "/stats -- Today's summary", "/today -- Top pages and referrers", "/live -- Active visitors now", "/help -- This message", "\nNew visitor alerts sent automatically.", ])) COMMANDS = { "/stats": cmd_stats, "/today": cmd_today, "/live": cmd_live, "/help": cmd_help, "/start": cmd_help, } # ── Telegram command polling ────────────────────────────────────────────────── # Must stay direct -- Apprise is send-only, cannot poll getUpdates. def poll_telegram_commands(): global last_telegram_update_id try: r = requests.get( f"https://api.telegram.org/bot{config.TELEGRAM_TOKEN}/getUpdates", params={"offset": last_telegram_update_id + 1, "timeout": 0}, timeout=5, ) for update in r.json().get("result", []): last_telegram_update_id = update["update_id"] msg = update.get("message", {}) chat_id = str(msg.get("chat", {}).get("id", "")) text = msg.get("text", "").strip().lower().split("@")[0] if chat_id == config.TELEGRAM_CHAT_ID and text in COMMANDS: print(f"[CMD] {text} from {chat_id}") COMMANDS[text](chat_id) except Exception as e: print(f"[ERROR] Telegram poll failed: {e}") # ── Main loop ───────────────────────────────────────────────────────────────── def main(): global seen_sessions print(f"[INFO] Watcher started. Polling every {config.POLL_INTERVAL}s...") notify("\U0001f7e2 Portfolio Watcher is now active!\nSend /help for commands.") # Seed seen sessions so we don't spam on startup initial = get_recent_sessions() seen_sessions = {s["id"] for s in initial} print(f"[INFO] Seeded {len(seen_sessions)} existing sessions") # Drain any queued Telegram commands before entering the loop poll_telegram_commands() while True: try: poll_telegram_commands() for session in get_recent_sessions(): sid = session["id"] if sid not in seen_sessions: seen_sessions.add(sid) notify(format_visitor(session)) print(f"[NOTIFY] New visitor: {session.get('city')}, {session.get('country')}") # Prevent unbounded memory growth if len(seen_sessions) > 500: seen_sessions = set(list(seen_sessions)[-200:]) except Exception as e: print(f"[ERROR] Loop error: {e}") time.sleep(config.POLL_INTERVAL) if __name__ == "__main__": main()