infra/notifier/watcher.py

260 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
watcher.py -- Umami visitor watcher + Telegram command bot.
Sending: new visitor alerts -> notifier.py -> Apprise -> Telegram
Receiving: /stats /today /live /help commands -> direct Telegram API (Apprise can't receive)
Run: python3 watcher.py
Keep alive: managed by systemd (see watcher.service)
"""
import time
import requests
from datetime import datetime, timezone
import config
from notifier import notify
# ── State (in-memory, resets on restart) ─────────────────────────────────────
seen_sessions = set()
_umami_auth_token = None
_umami_token_time = 0
last_telegram_update_id = 0
# ── Umami auth ────────────────────────────────────────────────────────────────
def get_umami_token():
"""Get a cached Umami JWT, refreshing every 30 minutes."""
global _umami_auth_token, _umami_token_time
now = time.time()
if _umami_auth_token and (now - _umami_token_time) < 1800:
return _umami_auth_token
try:
r = requests.post(
f"{config.UMAMI_URL}/api/auth/login",
json={"username": config.UMAMI_USER, "password": config.UMAMI_PASS},
timeout=10,
)
_umami_auth_token = r.json()["token"]
_umami_token_time = now
return _umami_auth_token
except Exception as e:
print(f"[ERROR] Umami auth failed: {e}")
return None
def umami_get(endpoint, params=None):
"""Authenticated GET to Umami API."""
token = get_umami_token()
if not token:
return None
try:
r = requests.get(
f"{config.UMAMI_URL}/api/websites/{config.UMAMI_SITE_ID}/{endpoint}",
params=params or {},
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
return r.json()
except Exception as e:
print(f"[ERROR] Umami API failed: {e}")
return None
def get_recent_sessions(minutes=5):
now = int(time.time()) * 1000
start = now - (minutes * 60 * 1000)
data = umami_get("sessions", {"startAt": start, "endAt": now})
return data.get("data", []) if data else []
# ── Notification formatting ───────────────────────────────────────────────────
def format_visitor(session):
device_map = {
"mobile": "\U0001f4f1", "tablet": "\U0001f4f1",
"laptop": "\U0001f4bb", "desktop": "\U0001f5a5\ufe0f"
}
emoji = device_map.get(session.get("device", ""), "\U0001f310")
city = session.get("city", "Unknown")
country = session.get("country", "??")
browser = session.get("browser", "Unknown").title()
os_name = session.get("os", "Unknown")
device = session.get("device", "Unknown").title()
views = session.get("views", 1)
first_at = session.get("firstAt") or session.get("createdAt") or ""
try:
dt = datetime.fromisoformat(first_at.replace("Z", "+00:00"))
time_str = dt.astimezone(config.IST).strftime("%I:%M %p IST")
except Exception:
time_str = "now"
return "\n".join([
f"{emoji} New Visitor!",
f"\U0001f4cd {city}, {country}",
f"\U0001f5a5\ufe0f {browser} / {os_name} / {device}",
f"\U0001f4c4 {views} page(s) viewed",
f"\U0001f550 {time_str}",
])
# ── Telegram command handlers ─────────────────────────────────────────────────
# These use notify() for sending, but must poll Telegram directly for receiving.
def cmd_live(chat_id):
data = umami_get("active")
if not data:
notify("\u26a0\ufe0f Could not fetch active visitors.")
return
count = data.get("visitors", 0)
if count == 0:
notify("\U0001f634 No active visitors right now.")
return
sessions = get_recent_sessions(minutes=5)
lines = [f"\U0001f7e2 {count} active visitor(s)\n"]
for s in sessions[:10]:
city = s.get("city", "?")
country = s.get("country", "?")
device = s.get("device", "?").title()
views = s.get("views", 1)
first_at = s.get("firstAt") or s.get("createdAt") or ""
try:
dt = datetime.fromisoformat(first_at.replace("Z", "+00:00"))
t = dt.astimezone(config.IST).strftime("%I:%M %p")
except Exception:
t = "?"
lines.append(f" \U0001f4cd {city}, {country} | {device} | {views} pages | {t} IST")
notify("\n".join(lines))
def cmd_stats(chat_id):
now_ms = int(time.time()) * 1000
today_start = int(datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
).timestamp()) * 1000
stats = umami_get("stats", {"startAt": today_start, "endAt": now_ms})
active = umami_get("active")
if not stats:
notify("\u26a0\ufe0f Could not fetch stats.")
return
pv = stats.get("pageviews", {}).get("value", 0)
visitors = stats.get("visitors", {}).get("value", 0)
visits = stats.get("visits", {}).get("value", 0)
bounce = stats.get("bounces", {}).get("value", 0)
total_time = stats.get("totaltime", {}).get("value", 0)
avg_time = round(total_time / visits, 1) if visits else 0
live = active.get("visitors", 0) if active else 0
notify("\n".join([
"\U0001f4ca Today's Stats",
f"\U0001f465 Visitors: {visitors}",
f"\U0001f440 Pageviews: {pv}",
f"\U0001f6aa Visits: {visits}",
f"\U0001f3c3 Bounces: {bounce}",
f"\u23f1\ufe0f Avg time: {avg_time}s",
f"\U0001f7e2 Live now: {live}",
]))
def cmd_today(chat_id):
now_ms = int(time.time()) * 1000
today_start = int(datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
).timestamp()) * 1000
pages = umami_get("metrics", {"startAt": today_start, "endAt": now_ms, "type": "url"})
referrers = umami_get("metrics", {"startAt": today_start, "endAt": now_ms, "type": "referrer"})
lines = ["\U0001f4c4 Today's Top Pages\n"]
for p in (pages or [])[:8]:
lines.append(f" {p.get('x','?')} -- {p.get('y',0)} views")
if not pages:
lines.append(" No data yet")
lines.append("\n\U0001f517 Top Referrers\n")
for r in (referrers or [])[:5]:
ref = r.get("x") or "(direct)"
lines.append(f" {ref} -- {r.get('y',0)}")
if not referrers:
lines.append(" No referrer data yet")
notify("\n".join(lines))
def cmd_help(chat_id):
notify("\n".join([
"\U0001f916 Portfolio Watcher Commands",
"/stats -- Today's summary",
"/today -- Top pages and referrers",
"/live -- Active visitors now",
"/help -- This message",
"\nNew visitor alerts sent automatically.",
]))
COMMANDS = {
"/stats": cmd_stats,
"/today": cmd_today,
"/live": cmd_live,
"/help": cmd_help,
"/start": cmd_help,
}
# ── Telegram command polling ──────────────────────────────────────────────────
# Must stay direct -- Apprise is send-only, cannot poll getUpdates.
def poll_telegram_commands():
global last_telegram_update_id
try:
r = requests.get(
f"https://api.telegram.org/bot{config.TELEGRAM_TOKEN}/getUpdates",
params={"offset": last_telegram_update_id + 1, "timeout": 0},
timeout=5,
)
for update in r.json().get("result", []):
last_telegram_update_id = update["update_id"]
msg = update.get("message", {})
chat_id = str(msg.get("chat", {}).get("id", ""))
text = msg.get("text", "").strip().lower().split("@")[0]
if chat_id == config.TELEGRAM_CHAT_ID and text in COMMANDS:
print(f"[CMD] {text} from {chat_id}")
COMMANDS[text](chat_id)
except Exception as e:
print(f"[ERROR] Telegram poll failed: {e}")
# ── Main loop ─────────────────────────────────────────────────────────────────
def main():
global seen_sessions
print(f"[INFO] Watcher started. Polling every {config.POLL_INTERVAL}s...")
notify("\U0001f7e2 Portfolio Watcher is now active!\nSend /help for commands.")
# Seed seen sessions so we don't spam on startup
initial = get_recent_sessions()
seen_sessions = {s["id"] for s in initial}
print(f"[INFO] Seeded {len(seen_sessions)} existing sessions")
# Drain any queued Telegram commands before entering the loop
poll_telegram_commands()
while True:
try:
poll_telegram_commands()
for session in get_recent_sessions():
sid = session["id"]
if sid not in seen_sessions:
seen_sessions.add(sid)
notify(format_visitor(session))
print(f"[NOTIFY] New visitor: {session.get('city')}, {session.get('country')}")
# Prevent unbounded memory growth
if len(seen_sessions) > 500:
seen_sessions = set(list(seen_sessions)[-200:])
except Exception as e:
print(f"[ERROR] Loop error: {e}")
time.sleep(config.POLL_INTERVAL)
if __name__ == "__main__":
main()