Skip to content

Build a Redis-Backed Task Queue in Python (Step by Step)

DodaTech Updated 2026-06-21 7 min read

In this tutorial, you'll learn about Build a Redis. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.

Build an asynchronous task queue in Python using Redis for persistent job storage, with scheduling, automatic retry on failure, and a configurable pool of worker processes.

What You'll Build

You'll create a task queue system where producer processes enqueue jobs as Redis lists, consumer workers pull jobs and execute them, failed jobs are retried up to a configurable limit, and a scheduler supports delayed execution. Think of a miniature Celery — without the complexity.

Why Build a Task Queue?

Task queues decouple request handling from background work. When a user uploads a file, you don't want the HTTP response to wait for processing. Instead, you enqueue a job and return immediately. Task queues power email sending, image processing, video transcoding, report generation, and log aggregation. At DodaTech, a Redis-backed task queue handles file conversion jobs in DodaZIP so users get instant upload responses while processing happens asynchronously.

Prerequisites

  • Python 3.10+ installed
  • Redis server running locally (redis-server)
  • Redis Python client (pip install redis)
  • Basic JSON knowledge for job Serialization

Step 1: Project Setup

mkdir task-queue && cd task-queue
python -m venv venv
source venv/bin/activate
pip install redis

Create this structure:

task-queue/
├── producer.py    # Enqueues jobs
├── worker.py      # Processes jobs
├── scheduler.py   # Delayed job scheduling
└── tasks.py       # Task definitions

Step 2: Job Representation

Every job is a JSON object with an ID, task name, arguments, retry counter, and status. We'll use Redis lists as our primary data structure — one list per job status:

# tasks.py
import json
import uuid
from datetime import datetime

def create_job(task_name, args=None, max_retries=3):
    return json.dumps({
        "id": str(uuid.uuid4()),
        "task": task_name,
        "args": args or [],
        "max_retries": max_retries,
        "attempts": 0,
        "status": "pending",
        "created_at": datetime.utcnow().isoformat(),
    })

Using UUID ensures globally unique job IDs even across multiple producers. The attempts field tracks how many times this job has been tried, so we know when to give up.

Step 3: The Producer

The producer pushes job JSON strings onto a Redis list called queue:pending. Workers pop from this list:

# producer.py
import redis
from tasks import create_job

r = redis.Redis(decode_responses=True)

def enqueue(task_name, *args, **kwargs):
    job = create_job(task_name, args)
    r.lpush("queue:pending", job)
    print(f"Enqueued job: {json.loads(job)['id']}")
    return job

if __name__ == "__main__":
    enqueue("send_email", "user@example.com", "Welcome!")
    enqueue("resize_image", "/uploads/photo.jpg", 800, 600)
    enqueue("generate_report", "monthly", "2026-06")

r.lpush inserts at the head of the list. Workers use brpop to block and wait for jobs from the tail, giving us FIFO behavior.

Step 4: The Worker Loop

Workers run an infinite loop: block until a job appears, deserialize it, execute the task, handle success or failure:

# worker.py
import redis
import json
import sys
import traceback

r = redis.Redis(decode_responses=True)

def send_email(to, subject):
    print(f"Sending email to {to}: {subject}")

def resize_image(path, width, height):
    print(f"Resizing {path} to {width}x{height}")

def generate_report(report_type, period):
    print(f"Generating {report_type} report for {period}")

TASKS = {
    "send_email": send_email,
    "resize_image": resize_image,
    "generate_report": generate_report,
}

def process_job(job_data):
    job = json.loads(job_data)
    task_name = job["task"]
    args = job["args"]

    if task_name not in TASKS:
        print(f"Unknown task: {task_name}")
        return

    try:
        TASKS[task_name](*args)
        r.lpush("queue:completed", json.dumps(job))
        print(f"Completed job {job['id']}")
    except Exception as e:
        job["attempts"] += 1
        if job["attempts"] < job["max_retries"]:
            r.lpush("queue:pending", json.dumps(job))
            print(f"Retrying job {job['id']} (attempt {job['attempts']})")
        else:
            job["status"] = "failed"
            r.lpush("queue:failed", json.dumps(job))
            print(f"Failed job {job['id']} after {job['attempts']} attempts")

if __name__ == "__main__":
    print(f"Worker started. Waiting for jobs...")
    while True:
        job_data = r.brpop("queue:pending", timeout=0)[1]
        process_job(job_data)

Key design decisions:

  • brpop blocks indefinitely until a job is available — no busy-waiting, no CPU waste
  • On failure, we increment the attempt counter and re-enqueue if retries remain
  • Failed jobs go to a separate queue:failed list for debugging and manual replay

Step 5: The Scheduler for Delayed Jobs

Sometimes you need to run a job "in 5 minutes" rather than immediately. We'll use a Redis sorted set where the score is the target UNIX timestamp:

# scheduler.py
import redis
import json
import time
from tasks import create_job

r = redis.Redis(decode_responses=True)

def schedule(task_name, delay_seconds, *args):
    job = create_job(task_name, args)
    run_at = time.time() + delay_seconds
    r.zadd("queue:scheduled", {job: run_at})
    print(f"Scheduled job in {delay_seconds}s")

def process_scheduled():
    now = time.time()
    for job_data in r.zrangebyscore("queue:scheduled", 0, now):
        r.lpush("queue:pending", job_data)
        r.zrem("queue:scheduled", job_data)
        print(f"Moved scheduled job to pending")

if __name__ == "__main__":
    schedule("send_email", 60, "user@example.com", "Delayed welcome")
    while True:
        process_scheduled()
        time.sleep(1)

The scheduler runs in its own Process, checking every second for jobs whose time has come. When a job's timestamp passes, it moves from the sorted set to the pending queue.

Step 6: Running the Full System

Terminal 1 — start the scheduler:

python scheduler.py

Terminal 2 — start a worker:

python worker.py

Terminal 3 — enqueue jobs:

python producer.py

Expected output (worker terminal):

Worker started. Waiting for jobs...
Enqueued job: <uuid-1>
Enqueued job: <uuid-2>
Enqueued job: <uuid-3>
Completed job <uuid-1>
Completed job <uuid-2>
Completed job <uuid-3>

Architecture

flowchart LR
    P[Producer] -->|lpush| Q[Redis List queue:pending]
    Q -->|brpop| W1[Worker 1]
    Q -->|brpop| W2[Worker 2]
    Q -->|brpop| W3[Worker 3]
    W1 -->|success| C[queue:completed]
    W2 -->|failure| F[queue:failed]
    W2 -->|retry| Q
    S[Scheduler] -->|zrangebyscore| Z[Redis Sorted Set]
    Z -->|lpush| Q

Common Errors

1. Redis connection refused If Redis is not running, you'll see redis.exceptions.ConnectionError. Start Redis with redis-server or check that the default port 6379 is accessible.

2. Job JSON won't deserialize If the producer serializes with json.dumps but the worker expects a different schema, you'll get json.JSONDecodeError. Always version your job schema and handle deserialization errors gracefully.

3. Blocking pop timeout confusion brpop("queue:pending", timeout=0) blocks forever. If you pass a positive integer, it returns None after that many seconds. A short timeout with a tight loop wastes CPU — use 0 for true blocking.

4. Retry storm when a backend is down If all jobs fail because a database is unreachable, re-enqueuing them immediately creates a retry storm. Add exponential backoff: wait 2^attempt seconds before re-enqueueing via the scheduler.

5. Duplicate job execution If a worker crashes after processing a job but before marking it complete, the job could be processed twice. Make your tasks idempotent — running them twice should produce the same result as running them once.

Practice Questions

1. Why do we use Redis lists instead of a Python queue? Redis lists persist across Process restarts, are accessible from multiple machines, and support blocking pop for efficient worker coordination. An in-memory Python queue disappears if the producer crashes.

2. How does brpop differ from lpop? lpop returns immediately (or None if empty). brpop blocks the connection until an element is available, avoiding CPU-wasteful polling loops. With timeout=0, it blocks indefinitely.

3. What happens if a worker crashes while processing a job? The job is lost unless you implement "heartbeat" monitoring. A more robust approach is to move jobs to "in-progress" temporarily and have a supervisor reclaim stuck jobs after a timeout.

4. Challenge: Implement job priorities Add a priority field to jobs (low, normal, high). Use separate Redis lists (queue:pending:high, queue:pending:normal, queue:pending:low) and have workers check higher-priority queues first.

5. Challenge: Add job result storage After a job completes, store its return value in Redis under a key like result:<job_id>. Allow producers to poll for results or register callbacks. This turns the queue into a full request-response system.

FAQ

How is this different from Celery?

Celery is a production-grade distributed task queue with built-in scheduling, result backends, multiple brokers (RabbitMQ, Redis, SQS), and monitoring. Our implementation is educational — it teaches the core concepts in about 100 lines of code.

Can I run multiple workers on different machines?

Yes. As long as all workers and producers connect to the same Redis instance, they can be distributed across any network. Redis handles the coordination through its shared data structures.

What happens to unprocessed jobs if Redis restarts?

By default Redis saves data to disk periodically (RDB snapshots) or via append-only file (AOF). With default configuration you might lose a few seconds of jobs. Configure Redis persistence based on your durability requirements.

Next Steps

  • Add Docker containers for the producer, worker, and scheduler components
  • Study Celery's architecture to see how production queues handle routing and monitoring
  • Combine with the Load Balancer tutorial to distribute work across a pool of worker nodes
  • Explore Redis Streams as a more feature-rich alternative to lists for job queues

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro