notifier: refactor watcher.py -- secrets to .env, send via Apprise, systemd service

This commit is contained in:
Manohar Gupta 2026-04-27 01:28:03 +05:30
parent 15f6aafeeb
commit 2256adbe56
6 changed files with 400 additions and 0 deletions

21
notifier/.env.example Normal file
View file

@ -0,0 +1,21 @@
# /opt/umami/notifier/.env
# Copy this to .env and fill in all values.
# NEVER commit .env to git -- it contains secrets.
# Umami
UMAMI_URL=http://localhost:3001
UMAMI_USER=admin
UMAMI_PASS= # your Umami admin password
UMAMI_SITE_ID=4d2ae2fe-0fb2-4119-879b-cda63ccbbd3f
# Telegram -- only used for receiving /commands (getUpdates).
# Sending goes via Apprise. Rotate here when BotFather issues a new token.
TELEGRAM_TOKEN= # your new rotated bot token
TELEGRAM_CHAT_ID=897499696
# Apprise -- all outbound notifications route through here.
# Format: https://USER:PASSWORD@notify.manohargupta.com/notify/apprise
APPRISE_URL=https://manohar: # add your Apprise password after the colon
# Tuning
POLL_INTERVAL=30 # seconds between Umami checks

43
notifier/config.py Normal file
View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
"""
config.py -- loads all configuration from .env file.
Never import secrets directly in other modules -- always import from here.
"""
import os
from pathlib import Path
from datetime import timezone, timedelta
# Load .env manually (no external deps needed).
# Reads KEY=VALUE lines, ignores comments and blank lines.
def _load_env(path="/opt/umami/notifier/.env"):
env_path = Path(path)
if not env_path.exists():
raise FileNotFoundError(f".env not found at {path}. Copy .env.example and fill in values.")
for line in env_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
os.environ.setdefault(key.strip(), value.strip())
_load_env()
# Umami
UMAMI_URL = os.environ["UMAMI_URL"]
UMAMI_USER = os.environ["UMAMI_USER"]
UMAMI_PASS = os.environ["UMAMI_PASS"]
UMAMI_SITE_ID = os.environ["UMAMI_SITE_ID"]
# Telegram -- only needed for receiving /commands via getUpdates.
# Sending goes through Apprise, not directly to Telegram API.
TELEGRAM_TOKEN = os.environ["TELEGRAM_TOKEN"]
TELEGRAM_CHAT_ID = os.environ["TELEGRAM_CHAT_ID"]
# Apprise -- all outbound notifications go here.
# Format: https://user:pass@notify.manohargupta.com/notify/apprise
APPRISE_URL = os.environ["APPRISE_URL"]
# General
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) # seconds
IST = timezone(timedelta(hours=5, minutes=30))

18
notifier/deploy.sh Normal file
View file

@ -0,0 +1,18 @@
#!/bin/bash
# deploy-notifier.sh -- copies refactored watcher to server
set -e
SERVER="root@100.75.128.45"
DEST="/opt/umami/notifier"
echo "Copying files..."
scp /Users/manohar_air/MyProjects/deployments/notifier/config.py $SERVER:$DEST/config.py
scp /Users/manohar_air/MyProjects/deployments/notifier/notifier.py $SERVER:$DEST/notifier.py
scp /Users/manohar_air/MyProjects/deployments/notifier/watcher.py $SERVER:$DEST/watcher.py
scp /Users/manohar_air/MyProjects/deployments/notifier/.env.example $SERVER:$DEST/.env.example
echo "Done. Now on the server:"
echo " 1. cp $DEST/.env.example $DEST/.env"
echo " 2. nano $DEST/.env (fill in secrets)"
echo " 3. chmod 600 $DEST/.env"
echo " 4. kill \$(pgrep -f watcher.py) && python3 $DEST/watcher.py &"

41
notifier/notifier.py Normal file
View file

@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
notifier.py -- single send function for all outbound notifications.
All modules call notify(message) -- never call Telegram or Apprise directly.
Routing: notify() -> Apprise API -> Telegram (or any other configured channel)
Why Apprise instead of direct Telegram:
- Retry logic built in (Telegram outages handled automatically)
- Swap notification channel in Apprise UI without touching code
- Token lives in .env + Apprise config, not scattered across scripts
"""
import requests
import config
def notify(message: str) -> bool:
"""
Send a notification via Apprise.
Returns True on success, False on failure.
Apprise accepts plain text -- HTML formatting works if Telegram is configured
with html_format in the Apprise config.
"""
try:
resp = requests.post(
config.APPRISE_URL,
json={"body": message},
timeout=10,
)
if resp.status_code == 200:
return True
else:
print(f"[WARN] Apprise returned {resp.status_code}: {resp.text[:200]}")
return False
except requests.exceptions.ConnectionError:
print("[ERROR] Could not reach Apprise -- is notify.manohargupta.com reachable?")
return False
except Exception as e:
print(f"[ERROR] Notification failed: {e}")
return False

260
notifier/watcher.py Normal file
View file

@ -0,0 +1,260 @@
#!/usr/bin/env python3
"""
watcher.py -- Umami visitor watcher + Telegram command bot.
Sending: new visitor alerts -> notifier.py -> Apprise -> Telegram
Receiving: /stats /today /live /help commands -> direct Telegram API (Apprise can't receive)
Run: python3 watcher.py
Keep alive: managed by systemd (see watcher.service)
"""
import time
import requests
from datetime import datetime, timezone
import config
from notifier import notify
# ── State (in-memory, resets on restart) ─────────────────────────────────────
seen_sessions = set()
_umami_auth_token = None
_umami_token_time = 0
last_telegram_update_id = 0
# ── Umami auth ────────────────────────────────────────────────────────────────
def get_umami_token():
"""Get a cached Umami JWT, refreshing every 30 minutes."""
global _umami_auth_token, _umami_token_time
now = time.time()
if _umami_auth_token and (now - _umami_token_time) < 1800:
return _umami_auth_token
try:
r = requests.post(
f"{config.UMAMI_URL}/api/auth/login",
json={"username": config.UMAMI_USER, "password": config.UMAMI_PASS},
timeout=10,
)
_umami_auth_token = r.json()["token"]
_umami_token_time = now
return _umami_auth_token
except Exception as e:
print(f"[ERROR] Umami auth failed: {e}")
return None
def umami_get(endpoint, params=None):
"""Authenticated GET to Umami API."""
token = get_umami_token()
if not token:
return None
try:
r = requests.get(
f"{config.UMAMI_URL}/api/websites/{config.UMAMI_SITE_ID}/{endpoint}",
params=params or {},
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
return r.json()
except Exception as e:
print(f"[ERROR] Umami API failed: {e}")
return None
def get_recent_sessions(minutes=5):
now = int(time.time()) * 1000
start = now - (minutes * 60 * 1000)
data = umami_get("sessions", {"startAt": start, "endAt": now})
return data.get("data", []) if data else []
# ── Notification formatting ───────────────────────────────────────────────────
def format_visitor(session):
device_map = {
"mobile": "\U0001f4f1", "tablet": "\U0001f4f1",
"laptop": "\U0001f4bb", "desktop": "\U0001f5a5\ufe0f"
}
emoji = device_map.get(session.get("device", ""), "\U0001f310")
city = session.get("city", "Unknown")
country = session.get("country", "??")
browser = session.get("browser", "Unknown").title()
os_name = session.get("os", "Unknown")
device = session.get("device", "Unknown").title()
views = session.get("views", 1)
first_at = session.get("firstAt") or session.get("createdAt") or ""
try:
dt = datetime.fromisoformat(first_at.replace("Z", "+00:00"))
time_str = dt.astimezone(config.IST).strftime("%I:%M %p IST")
except Exception:
time_str = "now"
return "\n".join([
f"{emoji} New Visitor!",
f"\U0001f4cd {city}, {country}",
f"\U0001f5a5\ufe0f {browser} / {os_name} / {device}",
f"\U0001f4c4 {views} page(s) viewed",
f"\U0001f550 {time_str}",
])
# ── Telegram command handlers ─────────────────────────────────────────────────
# These use notify() for sending, but must poll Telegram directly for receiving.
def cmd_live(chat_id):
data = umami_get("active")
if not data:
notify("\u26a0\ufe0f Could not fetch active visitors.")
return
count = data.get("visitors", 0)
if count == 0:
notify("\U0001f634 No active visitors right now.")
return
sessions = get_recent_sessions(minutes=5)
lines = [f"\U0001f7e2 {count} active visitor(s)\n"]
for s in sessions[:10]:
city = s.get("city", "?")
country = s.get("country", "?")
device = s.get("device", "?").title()
views = s.get("views", 1)
first_at = s.get("firstAt") or s.get("createdAt") or ""
try:
dt = datetime.fromisoformat(first_at.replace("Z", "+00:00"))
t = dt.astimezone(config.IST).strftime("%I:%M %p")
except Exception:
t = "?"
lines.append(f" \U0001f4cd {city}, {country} | {device} | {views} pages | {t} IST")
notify("\n".join(lines))
def cmd_stats(chat_id):
now_ms = int(time.time()) * 1000
today_start = int(datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
).timestamp()) * 1000
stats = umami_get("stats", {"startAt": today_start, "endAt": now_ms})
active = umami_get("active")
if not stats:
notify("\u26a0\ufe0f Could not fetch stats.")
return
pv = stats.get("pageviews", {}).get("value", 0)
visitors = stats.get("visitors", {}).get("value", 0)
visits = stats.get("visits", {}).get("value", 0)
bounce = stats.get("bounces", {}).get("value", 0)
total_time = stats.get("totaltime", {}).get("value", 0)
avg_time = round(total_time / visits, 1) if visits else 0
live = active.get("visitors", 0) if active else 0
notify("\n".join([
"\U0001f4ca Today's Stats",
f"\U0001f465 Visitors: {visitors}",
f"\U0001f440 Pageviews: {pv}",
f"\U0001f6aa Visits: {visits}",
f"\U0001f3c3 Bounces: {bounce}",
f"\u23f1\ufe0f Avg time: {avg_time}s",
f"\U0001f7e2 Live now: {live}",
]))
def cmd_today(chat_id):
now_ms = int(time.time()) * 1000
today_start = int(datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
).timestamp()) * 1000
pages = umami_get("metrics", {"startAt": today_start, "endAt": now_ms, "type": "url"})
referrers = umami_get("metrics", {"startAt": today_start, "endAt": now_ms, "type": "referrer"})
lines = ["\U0001f4c4 Today's Top Pages\n"]
for p in (pages or [])[:8]:
lines.append(f" {p.get('x','?')} -- {p.get('y',0)} views")
if not pages:
lines.append(" No data yet")
lines.append("\n\U0001f517 Top Referrers\n")
for r in (referrers or [])[:5]:
ref = r.get("x") or "(direct)"
lines.append(f" {ref} -- {r.get('y',0)}")
if not referrers:
lines.append(" No referrer data yet")
notify("\n".join(lines))
def cmd_help(chat_id):
notify("\n".join([
"\U0001f916 Portfolio Watcher Commands",
"/stats -- Today's summary",
"/today -- Top pages and referrers",
"/live -- Active visitors now",
"/help -- This message",
"\nNew visitor alerts sent automatically.",
]))
COMMANDS = {
"/stats": cmd_stats,
"/today": cmd_today,
"/live": cmd_live,
"/help": cmd_help,
"/start": cmd_help,
}
# ── Telegram command polling ──────────────────────────────────────────────────
# Must stay direct -- Apprise is send-only, cannot poll getUpdates.
def poll_telegram_commands():
global last_telegram_update_id
try:
r = requests.get(
f"https://api.telegram.org/bot{config.TELEGRAM_TOKEN}/getUpdates",
params={"offset": last_telegram_update_id + 1, "timeout": 0},
timeout=5,
)
for update in r.json().get("result", []):
last_telegram_update_id = update["update_id"]
msg = update.get("message", {})
chat_id = str(msg.get("chat", {}).get("id", ""))
text = msg.get("text", "").strip().lower().split("@")[0]
if chat_id == config.TELEGRAM_CHAT_ID and text in COMMANDS:
print(f"[CMD] {text} from {chat_id}")
COMMANDS[text](chat_id)
except Exception as e:
print(f"[ERROR] Telegram poll failed: {e}")
# ── Main loop ─────────────────────────────────────────────────────────────────
def main():
global seen_sessions
print(f"[INFO] Watcher started. Polling every {config.POLL_INTERVAL}s...")
notify("\U0001f7e2 Portfolio Watcher is now active!\nSend /help for commands.")
# Seed seen sessions so we don't spam on startup
initial = get_recent_sessions()
seen_sessions = {s["id"] for s in initial}
print(f"[INFO] Seeded {len(seen_sessions)} existing sessions")
# Drain any queued Telegram commands before entering the loop
poll_telegram_commands()
while True:
try:
poll_telegram_commands()
for session in get_recent_sessions():
sid = session["id"]
if sid not in seen_sessions:
seen_sessions.add(sid)
notify(format_visitor(session))
print(f"[NOTIFY] New visitor: {session.get('city')}, {session.get('country')}")
# Prevent unbounded memory growth
if len(seen_sessions) > 500:
seen_sessions = set(list(seen_sessions)[-200:])
except Exception as e:
print(f"[ERROR] Loop error: {e}")
time.sleep(config.POLL_INTERVAL)
if __name__ == "__main__":
main()

17
notifier/watcher.service Normal file
View file

@ -0,0 +1,17 @@
[Unit]
Description=Portfolio Watcher -- Umami visitor alerts via Apprise
After=network.target docker.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/umami/notifier
ExecStart=/usr/bin/python3 /opt/umami/notifier/watcher.py
Restart=on-failure
RestartSec=10
# Logs go to journald -- view with: journalctl -u watcher -f
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target