Enterprise Django 6 caching — Redis strategies, cache invalidation, per-object and per-query caching, GraphQL response caching, and cache stampede prevention.
Define and enforce caching strategies for enterprise Django 6 systems using Redis, covering per-object caching, query result caching, GraphQL response caching, cache invalidation discipline, and cache stampede prevention.
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': env('REDIS_CACHE_URL', default='redis://localhost:6379/2'),
'OPTIONS': {
'db': 2,
},
'KEY_PREFIX': 'cms',
'TIMEOUT': 300, # Default 5-minute TTL
}
}
{app}:{entity}:{identifier}:{variant}:
# Examples
"notifications:unread_count:user:42"
"grades:semester_gpa:student:123:sem:5"
"system:config:feature_flags"
cache.set('notifications:unread_count:user:42', count, timeout=60)
def mark_as_read(notification_id, user):
# ... update DB
cache.delete(f'notifications:unread_count:user:{user.id}')
cache.delete(f'notifications:list:user:{user.id}')
CACHE_VERSION = 'v2'
key = f"cms:{CACHE_VERSION}:notifications:unread_count:user:{user_id}"
cache.get_or_set() or the lock-based pattern for cache stampede prevention:
# Simple approach
count = cache.get_or_set(
f'notifications:unread_count:user:{user_id}',
lambda: notification_service.get_unread_count(user),
timeout=60
)
# Lock-based approach for expensive computations
from django.core.cache import cache
import hashlib
def cached_expensive_query(user_id):
key = f'reports:summary:user:{user_id}'
lock_key = f'{key}:lock'
result = cache.get(key)
if result is not None:
return result
# Acquire lock to prevent stampede
if cache.add(lock_key, '1', timeout=30):
try:
result = compute_expensive_summary(user_id)
cache.set(key, result, timeout=300)
return result
finally:
cache.delete(lock_key)
else:
# Another process is computing — wait and retry
import time
time.sleep(0.5)
return cache.get(key) or compute_expensive_summary(user_id)
result = cache.get(key)
if result is not None:
metrics.increment('cache.hit', tags=['key_type:unread_count'])
else:
metrics.increment('cache.miss', tags=['key_type:unread_count'])
cache.get() returning None gracefully — always have a fallback to database.cache.delete_pattern() in request hot paths — it scans all keys and blocks Redis.Cache individual model instances or computed values:
def get_student_gpa(student_id: int) -> float:
key = f'grades:gpa:student:{student_id}'
gpa = cache.get(key)
if gpa is None:
gpa = SemesterGPA.objects.filter(student_id=student_id).aggregate(
avg=Avg('gpa')
)['avg'] or 0.0
cache.set(key, gpa, timeout=600) # 10 minutes
return gpa
Cache query results for specific parameter combinations:
def get_notifications(user_id: int, category: str = None, limit: int = 20) -> list:
key = f'notifications:list:user:{user_id}:cat:{category}:limit:{limit}'
result = cache.get(key)
if result is None:
result = list(notification_service.get_user_notifications(...))
cache.set(key, result, timeout=30) # Short TTL for frequently changing data
return result
Cache expensive sub-computations within a larger operation:
def get_dashboard_data(user_id: int) -> dict:
# Fragment 1: Unread count (cached 60s)
unread = cache.get_or_set(
f'notifications:unread_count:user:{user_id}',
lambda: Notification.objects.filter(recipient_id=user_id, is_read=False).count(),
timeout=60
)
# Fragment 2: GPA (cached 10 min)
gpa = cache.get_or_set(
f'grades:gpa:student:{user_id}',
lambda: compute_gpa(user_id),
timeout=600
)
return {"unread_notifications": unread, "gpa": gpa}
Delete specific keys when related data changes:
def on_notification_created(user_id: int):
cache.delete(f'notifications:unread_count:user:{user_id}')
# Don't delete list cache — let TTL expire (30s is acceptable staleness)
For bulk invalidation, use versioned keys instead of pattern deletion:
# Instead of deleting all user notification cache keys:
# cache.delete_pattern(f'notifications:*:user:{user_id}') # AVOID in hot paths
# Use a version counter:
def invalidate_user_notification_cache(user_id: int):
version_key = f'notifications:version:user:{user_id}'
cache.incr(version_key) # Increment version — old keys become stale
def get_cache_key(user_id: int, suffix: str) -> str:
version = cache.get(f'notifications:version:user:{user_id}', 0)
return f'notifications:{version}:{suffix}:user:{user_id}'
Cache-Control headers.requirepass in production.pipeline() for multiple cache operations in a single round trip.maxmemory with allkeys-lru eviction policy.REFUSE to generate code that:
| Trade-off | Decision Rule |
|---|---|
| Short TTL vs Long TTL | Short (30-60s) for frequently changing data. Long (5-30m) for slowly changing reference data. |
| Active invalidation vs TTL expiry | Active invalidation for write-heavy data. TTL expiry for read-heavy, staleness-tolerant data. |
| Redis vs Local memory cache | Redis for multi-instance. Local memory only for per-process, request-scoped caching. |
| Cache everything vs Cache selectively | Cache only hot paths with measured performance impact. Don't cache speculatively. |