Build a Redis-Backed Task Queue in Python (Step by Step)
In this tutorial, you'll learn about Build a Redis. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.
Build an asynchronous task queue in Python using Redis for persistent job storage, with scheduling, automatic retry on failure, and a configurable pool of worker processes.
What You'll Build
You'll create a task queue system where producer processes enqueue jobs as Redis lists, consumer workers pull jobs and execute them, failed jobs are retried up to a configurable limit, and a scheduler supports delayed execution. Think of a miniature Celery — without the complexity.
Why Build a Task Queue?
Task queues decouple request handling from background work. When a user uploads a file, you don't want the HTTP response to wait for processing. Instead, you enqueue a job and return immediately. Task queues power email sending, image processing, video transcoding, report generation, and log aggregation. At DodaTech, a Redis-backed task queue handles file conversion jobs in DodaZIP so users get instant upload responses while processing happens asynchronously.
Prerequisites
- Python 3.10+ installed
- Redis server running locally (
redis-server) - Redis Python client (
pip install redis) - Basic JSON knowledge for job Serialization
Step 1: Project Setup
mkdir task-queue && cd task-queue
python -m venv venv
source venv/bin/activate
pip install redis
Create this structure:
task-queue/
├── producer.py # Enqueues jobs
├── worker.py # Processes jobs
├── scheduler.py # Delayed job scheduling
└── tasks.py # Task definitions
Step 2: Job Representation
Every job is a JSON object with an ID, task name, arguments, retry counter, and status. We'll use Redis lists as our primary data structure — one list per job status:
# tasks.py
import json
import uuid
from datetime import datetime
def create_job(task_name, args=None, max_retries=3):
return json.dumps({
"id": str(uuid.uuid4()),
"task": task_name,
"args": args or [],
"max_retries": max_retries,
"attempts": 0,
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
})
Using UUID ensures globally unique job IDs even across multiple producers. The attempts field tracks how many times this job has been tried, so we know when to give up.
Step 3: The Producer
The producer pushes job JSON strings onto a Redis list called queue:pending. Workers pop from this list:
# producer.py
import redis
from tasks import create_job
r = redis.Redis(decode_responses=True)
def enqueue(task_name, *args, **kwargs):
job = create_job(task_name, args)
r.lpush("queue:pending", job)
print(f"Enqueued job: {json.loads(job)['id']}")
return job
if __name__ == "__main__":
enqueue("send_email", "user@example.com", "Welcome!")
enqueue("resize_image", "/uploads/photo.jpg", 800, 600)
enqueue("generate_report", "monthly", "2026-06")
r.lpush inserts at the head of the list. Workers use brpop to block and wait for jobs from the tail, giving us FIFO behavior.
Step 4: The Worker Loop
Workers run an infinite loop: block until a job appears, deserialize it, execute the task, handle success or failure:
# worker.py
import redis
import json
import sys
import traceback
r = redis.Redis(decode_responses=True)
def send_email(to, subject):
print(f"Sending email to {to}: {subject}")
def resize_image(path, width, height):
print(f"Resizing {path} to {width}x{height}")
def generate_report(report_type, period):
print(f"Generating {report_type} report for {period}")
TASKS = {
"send_email": send_email,
"resize_image": resize_image,
"generate_report": generate_report,
}
def process_job(job_data):
job = json.loads(job_data)
task_name = job["task"]
args = job["args"]
if task_name not in TASKS:
print(f"Unknown task: {task_name}")
return
try:
TASKS[task_name](*args)
r.lpush("queue:completed", json.dumps(job))
print(f"Completed job {job['id']}")
except Exception as e:
job["attempts"] += 1
if job["attempts"] < job["max_retries"]:
r.lpush("queue:pending", json.dumps(job))
print(f"Retrying job {job['id']} (attempt {job['attempts']})")
else:
job["status"] = "failed"
r.lpush("queue:failed", json.dumps(job))
print(f"Failed job {job['id']} after {job['attempts']} attempts")
if __name__ == "__main__":
print(f"Worker started. Waiting for jobs...")
while True:
job_data = r.brpop("queue:pending", timeout=0)[1]
process_job(job_data)
Key design decisions:
brpopblocks indefinitely until a job is available — no busy-waiting, no CPU waste- On failure, we increment the attempt counter and re-enqueue if retries remain
- Failed jobs go to a separate
queue:failedlist for debugging and manual replay
Step 5: The Scheduler for Delayed Jobs
Sometimes you need to run a job "in 5 minutes" rather than immediately. We'll use a Redis sorted set where the score is the target UNIX timestamp:
# scheduler.py
import redis
import json
import time
from tasks import create_job
r = redis.Redis(decode_responses=True)
def schedule(task_name, delay_seconds, *args):
job = create_job(task_name, args)
run_at = time.time() + delay_seconds
r.zadd("queue:scheduled", {job: run_at})
print(f"Scheduled job in {delay_seconds}s")
def process_scheduled():
now = time.time()
for job_data in r.zrangebyscore("queue:scheduled", 0, now):
r.lpush("queue:pending", job_data)
r.zrem("queue:scheduled", job_data)
print(f"Moved scheduled job to pending")
if __name__ == "__main__":
schedule("send_email", 60, "user@example.com", "Delayed welcome")
while True:
process_scheduled()
time.sleep(1)
The scheduler runs in its own Process, checking every second for jobs whose time has come. When a job's timestamp passes, it moves from the sorted set to the pending queue.
Step 6: Running the Full System
Terminal 1 — start the scheduler:
python scheduler.py
Terminal 2 — start a worker:
python worker.py
Terminal 3 — enqueue jobs:
python producer.py
Expected output (worker terminal):
Worker started. Waiting for jobs...
Enqueued job: <uuid-1>
Enqueued job: <uuid-2>
Enqueued job: <uuid-3>
Completed job <uuid-1>
Completed job <uuid-2>
Completed job <uuid-3>
Architecture
flowchart LR
P[Producer] -->|lpush| Q[Redis List queue:pending]
Q -->|brpop| W1[Worker 1]
Q -->|brpop| W2[Worker 2]
Q -->|brpop| W3[Worker 3]
W1 -->|success| C[queue:completed]
W2 -->|failure| F[queue:failed]
W2 -->|retry| Q
S[Scheduler] -->|zrangebyscore| Z[Redis Sorted Set]
Z -->|lpush| Q
Common Errors
1. Redis connection refused
If Redis is not running, you'll see redis.exceptions.ConnectionError. Start Redis with redis-server or check that the default port 6379 is accessible.
2. Job JSON won't deserialize
If the producer serializes with json.dumps but the worker expects a different schema, you'll get json.JSONDecodeError. Always version your job schema and handle deserialization errors gracefully.
3. Blocking pop timeout confusion
brpop("queue:pending", timeout=0) blocks forever. If you pass a positive integer, it returns None after that many seconds. A short timeout with a tight loop wastes CPU — use 0 for true blocking.
4. Retry storm when a backend is down
If all jobs fail because a database is unreachable, re-enqueuing them immediately creates a retry storm. Add exponential backoff: wait 2^attempt seconds before re-enqueueing via the scheduler.
5. Duplicate job execution If a worker crashes after processing a job but before marking it complete, the job could be processed twice. Make your tasks idempotent — running them twice should produce the same result as running them once.
Practice Questions
1. Why do we use Redis lists instead of a Python queue? Redis lists persist across Process restarts, are accessible from multiple machines, and support blocking pop for efficient worker coordination. An in-memory Python queue disappears if the producer crashes.
2. How does brpop differ from lpop?
lpop returns immediately (or None if empty). brpop blocks the connection until an element is available, avoiding CPU-wasteful polling loops. With timeout=0, it blocks indefinitely.
3. What happens if a worker crashes while processing a job? The job is lost unless you implement "heartbeat" monitoring. A more robust approach is to move jobs to "in-progress" temporarily and have a supervisor reclaim stuck jobs after a timeout.
4. Challenge: Implement job priorities
Add a priority field to jobs (low, normal, high). Use separate Redis lists (queue:pending:high, queue:pending:normal, queue:pending:low) and have workers check higher-priority queues first.
5. Challenge: Add job result storage
After a job completes, store its return value in Redis under a key like result:<job_id>. Allow producers to poll for results or register callbacks. This turns the queue into a full request-response system.
FAQ
Next Steps
- Add Docker containers for the producer, worker, and scheduler components
- Study Celery's architecture to see how production queues handle routing and monitoring
- Combine with the Load Balancer tutorial to distribute work across a pool of worker nodes
- Explore Redis Streams as a more feature-rich alternative to lists for job queues
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro