FastAPI Async Redis Fix
In this tutorial, you'll learn about FastAPI Async Redis Fix. We cover key concepts, practical examples, and best practices.
The Problem
Using synchronous Redis (redis-py) in async FastAPI routes blocks the event loop. Caching, rate limiting, and session lookups become performance bottlenecks.
Quick Fix
Wrong — sync Redis in async route
import redis
r = redis.Redis(host='localhost', port=6379)
@app.get("/items/{item_id}")
async def get_item(item_id: int):
# Blocks the event loop!
cached = r.get(f"item:{item_id}")
if cached:
return json.loads(cached)
...
Output: Redis calls block async execution. Under load, this defeats the purpose of async.
Correct — async Redis with redis-py
from redis import asyncio as aioredis
redis_client = aioredis.Redis(host='localhost', port=6379, decode_responses=True)
@app.on_event("startup")
async def startup():
await redis_client.ping()
@app.on_event("shutdown")
async def shutdown():
await redis_client.close()
@app.get("/items/{item_id}")
async def get_item(item_id: int):
cached = await redis_client.get(f"item:{item_id}")
if cached:
return json.loads(cached)
...
Output: Redis commands are non-blocking. The event loop processes other requests during I/O.
Connection pool configuration
from redis import asyncio as aioredis
redis_client = aioredis.from_url(
"redis://localhost:6379/0",
max_connections=20,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
)
Redis as a FastAPI dependency
async def get_redis():
async with aioredis.from_url("redis://localhost:6379/0") as conn:
yield conn
@app.get("/cache/{key}")
async def get_cache(key: str, redis: aioredis.Redis = Depends(get_redis)):
value = await redis.get(key)
if value:
return {"key": key, "value": value, "source": "cache"}
return {"key": key, "value": None, "source": "miss"}
Caching with TTL
@app.get("/expensive-data")
async def expensive_data(redis: aioredis.Redis = Depends(get_redis)):
cache_key = "expensive:data"
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Expensive computation
data = await compute_expensive_data()
# Cache for 5 minutes
await redis.setex(cache_key, 300, json.dumps(data))
return data
Pub/Sub with async Redis
@app.websocket("/ws/notifications")
async def websocket_notifications(websocket: WebSocket):
await websocket.accept()
pubsub = redis_client.pubsub()
await pubsub.subscribe("notifications")
try:
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_text(message["data"])
except WebSocketDisconnect:
await pubsub.unsubscribe("notifications")
Prevention
- Use
redis.asyncio(redis-py 4.x+) for async Redis operations. - Manage Redis connection lifecycle with app startup/shutdown events.
- Use connection pooling with reasonable limits.
Common Mistakes with async redis
- Using
returnto exit a function early instead of wrapping a pure value in the monad - Mixing let bindings with <- bindings in do notation, producing type errors
- Overlapping type class instances that cause GHC to reject the program with ambiguous dispatch errors
These mistakes appear frequently in real-world FASTAPI code. DodaTech's contributors have identified these patterns through analysis of open-source projects and production systems.
Practice Exercise
Write a pure function that safely divides two integers using Maybe, then test it with edge cases like division by zero and negative numbers.
This exercise reinforces the concepts covered in this guide. Try implementing it before checking online solutions.
FAQ
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro