Skip to content

Build a Notification Aggregator in Python (Step-by-Step Guide)

DodaTech Updated 2026-06-21 7 min read

In this tutorial, you'll learn about Build a Notification Aggregator in Python (Step. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.

Build a notification aggregator in Python that ingests alerts from Slack, email (IMAP), SMS (Twilio), and generic Webhooks β€” then deduplicates, prioritizes, and serves them through a single unified API and dashboard.

What You'll Build

You will build a central notification hub β€” the kind that PagerDuty or Opsgenie provides, but self-hosted. It receives alerts from multiple channels, removes duplicates, assigns priority levels, and exposes a REST API for a unified inbox view.

Why Build a Notification Aggregator?

Operations teams drown in alerts from different tools. A single critical outage can trigger email alerts, Slack messages, PagerDuty pages, and SMS β€” all saying the same thing. An aggregator deduplicates these into one incident, reducing noise and preventing alert fatigue. DodaTech's infrastructure team uses a similar system to consolidate alerts from Durga Antivirus Pro's scanning cluster, dropping redundant notifications by 80%.

Prerequisites

  • Python 3.10+ installed
  • A Slack workspace for testing (free tier works)
  • Basic familiarity with Flask

Step 1: Project Setup

mkdir notification-aggregator
cd notification-aggregator
python -m venv venv
source venv/bin/activate
pip install flask requests twilio

Step 2: The Core Inbox Model

Each notification is stored with a fingerprint for deduplication. Two notifications with the same fingerprint within a window are considered duplicates.

# models.py
import json
import hashlib
import sqlite3
from datetime import datetime, timedelta

DB = "aggregator.db"

def init_db():
    conn = sqlite3.connect(DB)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS notifications (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source TEXT NOT NULL,
            title TEXT NOT NULL,
            body TEXT NOT NULL,
            priority INTEGER DEFAULT 0,
            fingerprint TEXT,
            created_at TEXT DEFAULT (datetime('now')),
            acknowledged INTEGER DEFAULT 0
        );
        CREATE INDEX IF NOT EXISTS idx_fingerprint
            ON notifications(fingerprint);
    """)
    conn.commit()
    conn.close()

def make_fingerprint(source: str, title: str, body: str) -> str:
    raw = f"{source}:{title}:{body}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]

def insert_notification(source: str, title: str, body: str, priority: int = 0) -> dict:
    conn = sqlite3.connect(DB)
    fp = make_fingerprint(source, title, body)

    recent = conn.execute(
        "SELECT id FROM notifications WHERE fingerprint = ? AND created_at > ?",
        (fp, (datetime.utcnow() - timedelta(hours=1)).isoformat())
    ).fetchone()

    if recent:
        conn.close()
        return {"duplicate": True, "existing_id": recent[0]}

    conn.execute(
        "INSERT INTO notifications (source, title, body, priority, fingerprint) VALUES (?, ?, ?, ?, ?)",
        (source, title, body, priority, fp)
    )
    conn.commit()
    notif_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
    conn.close()
    return {"duplicate": False, "id": notif_id}

Expected output: Insert the same notification twice within one hour. The second call returns {"duplicate": true, "existing_id": 1}.

Step 3: Slack Ingest

Slack sends notifications via a slash command or an outgoing webhook. We expose an endpoint that Slack calls with the alert details.

# slack_ingest.py
from flask import Blueprint, request, jsonify
from models import insert_notification

slack = Blueprint("slack", __name__)

@slack.route("/ingest/slack", methods=["POST"])
def ingest_slack():
    data = request.form if request.form else request.get_json(silent=True) or {}
    title = data.get("text", "").split("\n")[0][:100]
    body = data.get("text", "")
    result = insert_notification("slack", title, body, priority=1)
    return jsonify(result)

@slack.route("/ingest/webhook", methods=["POST"])
def ingest_webhook():
    data = request.get_json(silent=True) or {}
    title = data.get("title", "Webhook Alert")
    body = data.get("body", json.dumps(data))
    priority = data.get("priority", 0)
    result = insert_notification("webhook", title, body, priority)
    return jsonify(result)

Expected output: curl -X POST -d "text=CPU usage above 90% on server-01" http://localhost:5000/ingest/slack returns {"duplicate": false, "id": 1}.

Step 4: SMS Ingest via Twilio

Twilio forwards incoming SMS messages to a webhook URL. We parse the sender and message body and create a notification.

# sms_ingest.py
from flask import Blueprint, request, jsonify
from models import insert_notification

sms = Blueprint("sms", __name__)

@sms.route("/ingest/sms", methods=["POST"])
def ingest_sms():
    sender = request.form.get("From", "unknown")
    body = request.form.get("Body", "")
    title = f"SMS from {sender}"
    result = insert_notification("sms", title, body, priority=3)
    return jsonify(result)

@sms.route("/ingest/email", methods=["POST"])
def ingest_email():
    data = request.get_json(silent=True) or {}
    subject = data.get("subject", "Email Alert")
    body = data.get("body", "")
    result = insert_notification("email", subject, body, priority=2)
    return jsonify(result)

Step 5: Unified Inbox API

The API serves all notifications in one place with filtering, sorting, and acknowledgment.

# api.py
from flask import Blueprint, request, jsonify
import sqlite3

from models import DB

api = Blueprint("api", __name__)

@api.route("/api/notifications")
def list_notifications():
    conn = sqlite3.connect(DB)
    source = request.args.get("source")
    limit = min(int(request.args.get("limit", 50)), 200)
    offset = int(request.args.get("offset", 0))

    query = "SELECT * FROM notifications WHERE 1=1"
    params = []
    if source:
        query += " AND source = ?"
        params.append(source)

    query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
    params.extend([limit, offset])

    rows = conn.execute(query, params).fetchall()
    conn.close()

    return jsonify([
        {
            "id": r[0], "source": r[1], "title": r[2],
            "body": r[3], "priority": r[4], "created_at": r[6],
            "acknowledged": bool(r[7]),
        }
        for r in rows
    ])

@api.route("/api/notifications/<int:nid>/ack", methods=["POST"])
def acknowledge(nid):
    conn = sqlite3.connect(DB)
    conn.execute("UPDATE notifications SET acknowledged = 1 WHERE id = ?", (nid,))
    conn.commit()
    conn.close()
    return jsonify({"success": True})

@api.route("/api/stats")
def stats():
    conn = sqlite3.connect(DB)
    total = conn.execute("SELECT COUNT(*) FROM notifications").fetchone()[0]
    unacked = conn.execute("SELECT COUNT(*) FROM notifications WHERE acknowledged = 0").fetchone()[0]
    by_source = conn.execute(
        "SELECT source, COUNT(*) FROM notifications GROUP BY source"
    ).fetchall()
    conn.close()
    return jsonify({
        "total": total,
        "unacknowledged": unacked,
        "by_source": {s: c for s, c in by_source}
    })

Step 6: Application Assembly

# app.py
from flask import Flask
from models import init_db
from slack_ingest import slack
from sms_ingest import sms
from api import api

app = Flask(__name__)
app.register_blueprint(slack)
app.register_blueprint(sms)
app.register_blueprint(api)

if __name__ == "__main__":
    init_db()
    app.run(port=5000, debug=True)

Expected output: Start with python app.py. Visit http://localhost:5000/api/stats β€” returns {"total": 0, "unacknowledged": 0, "by_source": {}}.

Architecture

flowchart TB
    subgraph "Sources"
        SL[Slack Webhook]
        EM[Email IMAP]
        TW[Twilio SMS]
        WH[Generic Webhook]
    end
    subgraph "Aggregator"
        INGEST[Ingest Layer
Flask Blueprints] DEDUP[Deduplication
SHA256 fingerprint
1-hour window] SQL[(SQLite)] API[REST API
/api/notifications] end subgraph "Consumers" DASH[Dashboard] MOB[Mobile App] CLI[CLI Tool] end SL --> INGEST EM --> INGEST TW --> INGEST WH --> INGEST INGEST --> DEDUP DEDUP --> SQL API --> SQL DASH --> API MOB --> API CLI --> API

Common Errors

1. Twilio not calling the webhook Twilio needs a publicly accessible URL. Use ngrok during development: ngrok http 5000 gives you a public URL to configure in the Twilio console.

2. Deduplication not working The fingerprint includes source, title, and body. If Slack prepends a timestamp to the message, every copy gets a different fingerprint. Normalize the body by stripping timestamps before fingerprinting.

3. Priority not reflecting urgency Our priority scale is ad-hoc (0=low, 3=critical). Map each source's own severity to this scaleβ€”for example, Twilio SMS is always high priority because someone chose to pay for a text message.

4. Database locked under load SQLite handles one writer at a time. Under heavy load, switch to PostgreSQL. For light to moderate traffic, SQLite with WAL mode (PRAGMA journal_mode=WAL) is sufficient.

5. Duplicate detection across restart Fingerprint matching uses a time window. If the server restarts, the window slides but duplicates within that window are still caught β€” no persistent state is lost.

Practice Questions

1. How does the aggregator detect duplicate notifications? It computes a SHA256 fingerprint from source, title, and body. If the same fingerprint appears within the dedup window (1 hour), the second notification is marked as a duplicate and not stored.

2. Why store priority at the notification level? Different sources have different urgency. An SMS alert (paid channel) is inherently more urgent than a Slack message. Assigning priority at ingest time lets consumers sort and filter meaningfully.

3. How would you add email reading via IMAP? Run a background thread that polls an IMAP inbox, fetches unseen messages, and calls insert_notification("email", subject, body, 2) for each one. Mark messages as seen after ingestion.

4. Challenge: Email sending via SendGrid Add an outbound notification channel. When a high-priority notification arrives, automatically send an email via the SendGrid API. Include the notification title and body in the email.

5. Challenge: Alert silencing Allow users to silence notifications matching certain patterns. Store silence rules in a separate table with regex patterns. Before inserting a notification, check if any silencing rule matches.

FAQ

What is the deduplication window and why 1 hour?

The fingerprint window prevents the same alert from creating multiple incidents within a short period. One hour is a reasonable default for ops alerts β€” if the same issue persists for longer, new notifications are likely legitimate re-occurrences.

Can I add more sources?

Yes. Each source is a Flask blueprint with a single route. Create a new file (e.g., <a href="/devops/incident-response/">PagerDuty</a>_ingest.py) that parses the incoming payload and calls insert_notification. Register the blueprint in app.py.

Does this replace PagerDuty?

For basic alert aggregation, yes. PagerDuty adds on-call scheduling, escalation policies, and phone call alerts. This is a lightweight alternative for teams that want control over their data without per-seat pricing.

Next Steps

  • Add WebSocket push so the dashboard updates in real-time without polling
  • Integrate Redis as a fast dedup cache instead of SQL queries
  • Build a React dashboard at /api/notifications with filtering, search, and bulk acknowledge
  • Explore Docker deployment so the aggregator runs as a service alongside your other infrastructure

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro