Build a Notification Aggregator in Python (Step-by-Step Guide)
In this tutorial, you'll learn about Build a Notification Aggregator in Python (Step. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.
Build a notification aggregator in Python that ingests alerts from Slack, email (IMAP), SMS (Twilio), and generic Webhooks β then deduplicates, prioritizes, and serves them through a single unified API and dashboard.
What You'll Build
You will build a central notification hub β the kind that PagerDuty or Opsgenie provides, but self-hosted. It receives alerts from multiple channels, removes duplicates, assigns priority levels, and exposes a REST API for a unified inbox view.
Why Build a Notification Aggregator?
Operations teams drown in alerts from different tools. A single critical outage can trigger email alerts, Slack messages, PagerDuty pages, and SMS β all saying the same thing. An aggregator deduplicates these into one incident, reducing noise and preventing alert fatigue. DodaTech's infrastructure team uses a similar system to consolidate alerts from Durga Antivirus Pro's scanning cluster, dropping redundant notifications by 80%.
Prerequisites
Step 1: Project Setup
mkdir notification-aggregator
cd notification-aggregator
python -m venv venv
source venv/bin/activate
pip install flask requests twilio
Step 2: The Core Inbox Model
Each notification is stored with a fingerprint for deduplication. Two notifications with the same fingerprint within a window are considered duplicates.
# models.py
import json
import hashlib
import sqlite3
from datetime import datetime, timedelta
DB = "aggregator.db"
def init_db():
conn = sqlite3.connect(DB)
conn.executescript("""
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
title TEXT NOT NULL,
body TEXT NOT NULL,
priority INTEGER DEFAULT 0,
fingerprint TEXT,
created_at TEXT DEFAULT (datetime('now')),
acknowledged INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_fingerprint
ON notifications(fingerprint);
""")
conn.commit()
conn.close()
def make_fingerprint(source: str, title: str, body: str) -> str:
raw = f"{source}:{title}:{body}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def insert_notification(source: str, title: str, body: str, priority: int = 0) -> dict:
conn = sqlite3.connect(DB)
fp = make_fingerprint(source, title, body)
recent = conn.execute(
"SELECT id FROM notifications WHERE fingerprint = ? AND created_at > ?",
(fp, (datetime.utcnow() - timedelta(hours=1)).isoformat())
).fetchone()
if recent:
conn.close()
return {"duplicate": True, "existing_id": recent[0]}
conn.execute(
"INSERT INTO notifications (source, title, body, priority, fingerprint) VALUES (?, ?, ?, ?, ?)",
(source, title, body, priority, fp)
)
conn.commit()
notif_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.close()
return {"duplicate": False, "id": notif_id}
Expected output: Insert the same notification twice within one hour. The second call returns {"duplicate": true, "existing_id": 1}.
Step 3: Slack Ingest
Slack sends notifications via a slash command or an outgoing webhook. We expose an endpoint that Slack calls with the alert details.
# slack_ingest.py
from flask import Blueprint, request, jsonify
from models import insert_notification
slack = Blueprint("slack", __name__)
@slack.route("/ingest/slack", methods=["POST"])
def ingest_slack():
data = request.form if request.form else request.get_json(silent=True) or {}
title = data.get("text", "").split("\n")[0][:100]
body = data.get("text", "")
result = insert_notification("slack", title, body, priority=1)
return jsonify(result)
@slack.route("/ingest/webhook", methods=["POST"])
def ingest_webhook():
data = request.get_json(silent=True) or {}
title = data.get("title", "Webhook Alert")
body = data.get("body", json.dumps(data))
priority = data.get("priority", 0)
result = insert_notification("webhook", title, body, priority)
return jsonify(result)
Expected output: curl -X POST -d "text=CPU usage above 90% on server-01" http://localhost:5000/ingest/slack returns {"duplicate": false, "id": 1}.
Step 4: SMS Ingest via Twilio
Twilio forwards incoming SMS messages to a webhook URL. We parse the sender and message body and create a notification.
# sms_ingest.py
from flask import Blueprint, request, jsonify
from models import insert_notification
sms = Blueprint("sms", __name__)
@sms.route("/ingest/sms", methods=["POST"])
def ingest_sms():
sender = request.form.get("From", "unknown")
body = request.form.get("Body", "")
title = f"SMS from {sender}"
result = insert_notification("sms", title, body, priority=3)
return jsonify(result)
@sms.route("/ingest/email", methods=["POST"])
def ingest_email():
data = request.get_json(silent=True) or {}
subject = data.get("subject", "Email Alert")
body = data.get("body", "")
result = insert_notification("email", subject, body, priority=2)
return jsonify(result)
Step 5: Unified Inbox API
The API serves all notifications in one place with filtering, sorting, and acknowledgment.
# api.py
from flask import Blueprint, request, jsonify
import sqlite3
from models import DB
api = Blueprint("api", __name__)
@api.route("/api/notifications")
def list_notifications():
conn = sqlite3.connect(DB)
source = request.args.get("source")
limit = min(int(request.args.get("limit", 50)), 200)
offset = int(request.args.get("offset", 0))
query = "SELECT * FROM notifications WHERE 1=1"
params = []
if source:
query += " AND source = ?"
params.append(source)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(query, params).fetchall()
conn.close()
return jsonify([
{
"id": r[0], "source": r[1], "title": r[2],
"body": r[3], "priority": r[4], "created_at": r[6],
"acknowledged": bool(r[7]),
}
for r in rows
])
@api.route("/api/notifications/<int:nid>/ack", methods=["POST"])
def acknowledge(nid):
conn = sqlite3.connect(DB)
conn.execute("UPDATE notifications SET acknowledged = 1 WHERE id = ?", (nid,))
conn.commit()
conn.close()
return jsonify({"success": True})
@api.route("/api/stats")
def stats():
conn = sqlite3.connect(DB)
total = conn.execute("SELECT COUNT(*) FROM notifications").fetchone()[0]
unacked = conn.execute("SELECT COUNT(*) FROM notifications WHERE acknowledged = 0").fetchone()[0]
by_source = conn.execute(
"SELECT source, COUNT(*) FROM notifications GROUP BY source"
).fetchall()
conn.close()
return jsonify({
"total": total,
"unacknowledged": unacked,
"by_source": {s: c for s, c in by_source}
})
Step 6: Application Assembly
# app.py
from flask import Flask
from models import init_db
from slack_ingest import slack
from sms_ingest import sms
from api import api
app = Flask(__name__)
app.register_blueprint(slack)
app.register_blueprint(sms)
app.register_blueprint(api)
if __name__ == "__main__":
init_db()
app.run(port=5000, debug=True)
Expected output: Start with python app.py. Visit http://localhost:5000/api/stats β returns {"total": 0, "unacknowledged": 0, "by_source": {}}.
Architecture
flowchart TB
subgraph "Sources"
SL[Slack Webhook]
EM[Email IMAP]
TW[Twilio SMS]
WH[Generic Webhook]
end
subgraph "Aggregator"
INGEST[Ingest Layer
Flask Blueprints]
DEDUP[Deduplication
SHA256 fingerprint
1-hour window]
SQL[(SQLite)]
API[REST API
/api/notifications]
end
subgraph "Consumers"
DASH[Dashboard]
MOB[Mobile App]
CLI[CLI Tool]
end
SL --> INGEST
EM --> INGEST
TW --> INGEST
WH --> INGEST
INGEST --> DEDUP
DEDUP --> SQL
API --> SQL
DASH --> API
MOB --> API
CLI --> API
Common Errors
1. Twilio not calling the webhook
Twilio needs a publicly accessible URL. Use ngrok during development: ngrok http 5000 gives you a public URL to configure in the Twilio console.
2. Deduplication not working The fingerprint includes source, title, and body. If Slack prepends a timestamp to the message, every copy gets a different fingerprint. Normalize the body by stripping timestamps before fingerprinting.
3. Priority not reflecting urgency Our priority scale is ad-hoc (0=low, 3=critical). Map each source's own severity to this scaleβfor example, Twilio SMS is always high priority because someone chose to pay for a text message.
4. Database locked under load
SQLite handles one writer at a time. Under heavy load, switch to PostgreSQL. For light to moderate traffic, SQLite with WAL mode (PRAGMA journal_mode=WAL) is sufficient.
5. Duplicate detection across restart Fingerprint matching uses a time window. If the server restarts, the window slides but duplicates within that window are still caught β no persistent state is lost.
Practice Questions
1. How does the aggregator detect duplicate notifications? It computes a SHA256 fingerprint from source, title, and body. If the same fingerprint appears within the dedup window (1 hour), the second notification is marked as a duplicate and not stored.
2. Why store priority at the notification level? Different sources have different urgency. An SMS alert (paid channel) is inherently more urgent than a Slack message. Assigning priority at ingest time lets consumers sort and filter meaningfully.
3. How would you add email reading via IMAP?
Run a background thread that polls an IMAP inbox, fetches unseen messages, and calls insert_notification("email", subject, body, 2) for each one. Mark messages as seen after ingestion.
4. Challenge: Email sending via SendGrid Add an outbound notification channel. When a high-priority notification arrives, automatically send an email via the SendGrid API. Include the notification title and body in the email.
5. Challenge: Alert silencing Allow users to silence notifications matching certain patterns. Store silence rules in a separate table with regex patterns. Before inserting a notification, check if any silencing rule matches.
FAQ
Next Steps
- Add WebSocket push so the dashboard updates in real-time without polling
- Integrate Redis as a fast dedup cache instead of SQL queries
- Build a React dashboard at /api/notifications with filtering, search, and bulk acknowledge
- Explore Docker deployment so the aggregator runs as a service alongside your other infrastructure
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro