Comprehensive guide for Redis state management including caching strategies, session management, pub/sub patterns, distributed locks, and data structures
A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.
Use this skill when:
Redis (Remote Dictionary Server) is an in-memory data structure store used as:
Key Characteristics:
Redis provides multiple data types for different use cases:
Strings: Simple key-value pairs, binary safe
Hashes: Field-value maps (objects)
Lists: Ordered collections (linked lists)
Sets: Unordered unique collections
Sorted Sets: Ordered sets with scores
Streams: Append-only logs with consumer groups
Connection Pools: Redis connections are expensive to create. Always use connection pools:
import redis
# Connection pool (recommended)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)
# Direct connection (avoid in production)
r = redis.Redis(host='localhost', port=6379, db=0)
Best Practices:
Redis offers two persistence mechanisms:
RDB (Redis Database): Point-in-time snapshots
AOF (Append-Only File): Log of write operations
Hybrid Approach: RDB + AOF for best of both worlds
Redis Serialization Protocol version 3 offers:
import redis
from redis.cache import CacheConfig
# Enable RESP3 with client-side caching
r = redis.Redis(host='localhost', port=6379, protocol=3,
cache_config=CacheConfig())
Pattern: Application checks cache first, loads from database on miss
import redis
import json
from typing import Optional, Dict, Any
r = redis.Redis(decode_responses=True)
def get_user(user_id: int) -> Optional[Dict[str, Any]]:
"""Cache-aside pattern for user data."""
cache_key = f"user:{user_id}"
# Try cache first
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Cache miss - load from database
user_data = database.get_user(user_id) # Your DB query
if user_data:
# Store in cache with 1 hour TTL
r.setex(cache_key, 3600, json.dumps(user_data))
return user_data
Advantages:
Disadvantages:
Pattern: Write to cache and database simultaneously
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool:
"""Write-through pattern for user updates."""
cache_key = f"user:{user_id}"
# Write to database first
success = database.update_user(user_id, user_data)
if success:
# Update cache immediately
r.setex(cache_key, 3600, json.dumps(user_data))
return success
Advantages:
Disadvantages:
Pattern: Write to cache immediately, sync to database asynchronously
import redis
import json
from queue import Queue
from threading import Thread
r = redis.Redis(decode_responses=True)
write_queue = Queue()
def async_writer():
"""Background worker to sync cache to database."""
while True:
user_id, user_data = write_queue.get()
try:
database.update_user(user_id, user_data)
except Exception as e:
# Log error, potentially retry
print(f"Failed to write user {user_id}: {e}")
finally:
write_queue.task_done()
# Start background writer
Thread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]):
"""Write-behind pattern for fast writes."""
cache_key = f"user:{user_id}"
# Write to cache immediately (fast)
r.setex(cache_key, 3600, json.dumps(user_data))
# Queue database write (async)
write_queue.put((user_id, user_data))
Advantages:
Disadvantages:
Time-based Expiration (TTL):
# Set key with expiration
r.setex("session:abc123", 1800, session_data) # 30 minutes
# Or set TTL on existing key
r.expire("user:profile:123", 3600) # 1 hour
# Check remaining TTL
ttl = r.ttl("user:profile:123")
Event-based Invalidation:
def update_product(product_id: int, product_data: dict):
"""Invalidate cache on update."""
# Update database
database.update_product(product_id, product_data)
# Invalidate related caches
r.delete(f"product:{product_id}")
r.delete(f"product_list:category:{product_data['category']}")
r.delete("products:featured")
Pattern-based Invalidation:
# Delete all keys matching pattern
def invalidate_user_cache(user_id: int):
"""Invalidate all cache entries for a user."""
pattern = f"user:{user_id}:*"
# Find and delete matching keys
for key in r.scan_iter(match=pattern, count=100):
r.delete(key)
Problem: Multiple requests simultaneously miss cache and query database
Solution 1: Probabilistic Early Expiration
import time
import random
def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0):
"""Prevent stampede with probabilistic early recomputation."""
value = r.get(key)
if value is None:
# Cache miss - compute and cache
value = compute_value(key)
r.setex(key, ttl, value)
return value
# Check if we should recompute early
current_time = time.time()
delta = current_time - float(r.get(f"{key}:timestamp") or 0)
expiry = ttl * random.random() * beta
if delta > expiry:
# Recompute in background
value = compute_value(key)
r.setex(key, ttl, value)
r.set(f"{key}:timestamp", current_time)
return value
Solution 2: Locking
from contextlib import contextmanager
@contextmanager
def cache_lock(key: str, timeout: int = 10):
"""Acquire lock for cache computation."""
lock_key = f"{key}:lock"
identifier = str(time.time())
# Try to acquire lock
if r.set(lock_key, identifier, nx=True, ex=timeout):
try:
yield True
finally:
# Release lock
if r.get(lock_key) == identifier:
r.delete(lock_key)
else:
yield False
def get_with_lock(key: str):
"""Use lock to prevent stampede."""
value = r.get(key)
if value is None:
with cache_lock(key) as acquired:
if acquired:
# We got the lock - compute value
value = compute_value(key)
r.setex(key, 3600, value)
else:
# Someone else is computing - wait and retry
time.sleep(0.1)
value = r.get(key) or compute_value(key)
return value
Basic Session Management:
import redis
import json
import uuid
from datetime import datetime, timedelta
r = redis.Redis(decode_responses=True)
class SessionManager:
def __init__(self, ttl: int = 1800):
"""Session manager with Redis backend.
Args:
ttl: Session timeout in seconds (default 30 minutes)
"""
self.ttl = ttl
def create_session(self, user_id: int, data: dict = None) -> str:
"""Create new session and return session ID."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"data": data or {}
}
r.setex(session_key, self.ttl, json.dumps(session_data))
return session_id
def get_session(self, session_id: str) -> dict:
"""Retrieve session data and refresh TTL."""
session_key = f"session:{session_id}"
session_data = r.get(session_key)
if session_data:
# Refresh TTL on access (sliding expiration)
r.expire(session_key, self.ttl)
return json.loads(session_data)
return None
def update_session(self, session_id: str, data: dict) -> bool:
"""Update session data."""
session_key = f"session:{session_id}"
session_data = self.get_session(session_id)
if session_data:
session_data["data"].update(data)
r.setex(session_key, self.ttl, json.dumps(session_data))
return True
return False
def delete_session(self, session_id: str) -> bool:
"""Delete session (logout)."""
session_key = f"session:{session_id}"
return r.delete(session_key) > 0
More efficient for session objects:
class HashSessionManager:
"""Session manager using Redis hashes for better performance."""
def __init__(self, ttl: int = 1800):
self.ttl = ttl
def create_session(self, user_id: int, **kwargs) -> str:
"""Create session using hash."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
# Store as hash for efficient field access
session_fields = {
"user_id": str(user_id),
"created_at": datetime.utcnow().isoformat(),
**{k: str(v) for k, v in kwargs.items()}
}
r.hset(session_key, mapping=session_fields)
r.expire(session_key, self.ttl)
return session_id
def get_field(self, session_id: str, field: str) -> str:
"""Get single session field efficiently."""
session_key = f"session:{session_id}"
value = r.hget(session_key, field)
if value:
r.expire(session_key, self.ttl) # Refresh TTL
return value
def set_field(self, session_id: str, field: str, value: str) -> bool:
"""Update single session field."""
session_key = f"session:{session_id}"
if r.exists(session_key):
r.hset(session_key, field, value)
r.expire(session_key, self.ttl)
return True
return False
def get_all(self, session_id: str) -> dict:
"""Get all session fields."""
session_key = f"session:{session_id}"
data = r.hgetall(session_key)
if data:
r.expire(session_key, self.ttl)
return data
def track_user_activity(user_id: int, action: str):
"""Track user activity with automatic expiration."""
activity_key = f"user:{user_id}:activity"
timestamp = datetime.utcnow().isoformat()
# Add activity to list
r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))
# Keep only last 100 activities
r.ltrim(activity_key, 0, 99)
# Set expiration (30 days)
r.expire(activity_key, 2592000)
def get_recent_activity(user_id: int, limit: int = 10) -> list:
"""Get recent user activities."""
activity_key = f"user:{user_id}:activity"
activities = r.lrange(activity_key, 0, limit - 1)
return [json.loads(a) for a in activities]
Publisher:
import redis
r = redis.Redis(decode_responses=True)
def publish_event(channel: str, message: dict):
"""Publish event to channel."""
import json
r.publish(channel, json.dumps(message))
# Example usage
publish_event("notifications", {
"type": "user_signup",
"user_id": 12345,
"timestamp": datetime.utcnow().isoformat()
})
Subscriber:
import redis
import json
def handle_message(message):
"""Process received message."""
data = json.loads(message['data'])
print(f"Received: {data}")
# Initialize pubsub
r = redis.Redis(decode_responses=True)
p = r.pubsub()
# Subscribe to channels
p.subscribe('notifications', 'alerts')
# Listen for messages
for message in p.listen():
if message['type'] == 'message':
handle_message(message)
# Subscribe to multiple channels with patterns
p = r.pubsub()
p.psubscribe('user:*', 'notification:*')
# Get messages from pattern subscriptions
for message in p.listen():
if message['type'] == 'pmessage':
channel = message['channel']
pattern = message['pattern']
data = message['data']
print(f"Pattern {pattern} matched {channel}: {data}")
import redis
import time
r = redis.Redis(decode_responses=True)
p = r.pubsub()
def message_handler(message):
"""Handle messages in background thread."""
print(f"Handler received: {message['data']}")
# Subscribe with handler
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})
# Run in background thread
thread = p.run_in_thread(sleep_time=0.001)
# Publish some messages
r.publish('notifications', 'Hello!')
r.publish('alerts', 'Warning!')
time.sleep(1)
# Stop background thread
thread.stop()
import asyncio
import redis.asyncio as redis
async def reader(channel: redis.client.PubSub):
"""Async message reader."""
while True:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
if message is not None:
print(f"Received: {message}")
# Stop on specific message
if message["data"].decode() == "STOP":
break
async def pubsub_example():
"""Async pub/sub example."""
r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
# Subscribe to channels
await pubsub.subscribe("channel:1", "channel:2")
# Create reader task
reader_task = asyncio.create_task(reader(pubsub))
# Publish messages
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", "STOP")
# Wait for reader to finish
await reader_task
await r.close()
# Run async example
asyncio.run(pubsub_example())
from redis.cluster import RedisCluster, ClusterNode
# Connect to cluster
rc = RedisCluster(startup_nodes=[
ClusterNode('localhost', 6379),
ClusterNode('localhost', 6380)
])
# Create sharded pubsub
p = rc.pubsub()
p.ssubscribe('foo')
# Get message from specific node
message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
import redis
import time
import uuid
class RedisLock:
"""Simple distributed lock using Redis."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock."""
end_time = time.time() + (timeout or self.timeout)
while True:
# Try to set lock with NX (only if not exists) and EX (expiration)
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
return True
if not blocking:
return False
if timeout and time.time() > end_time:
return False
# Wait before retry
time.sleep(0.01)
def release(self) -> bool:
"""Release lock only if we own it."""
# Use Lua script for atomic check-and-delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
def __enter__(self):
"""Context manager support."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager cleanup."""
self.release()
# Usage example
r = redis.Redis()
lock = RedisLock(r, "resource:123", timeout=5)
with lock:
# Critical section - only one process at a time
print("Processing resource 123")
process_resource()
import threading
class RenewableLock:
"""Distributed lock with automatic renewal."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.renewal_thread = None
self.stop_renewal = threading.Event()
def _renew_lock(self):
"""Background task to renew lock."""
while not self.stop_renewal.is_set():
time.sleep(self.timeout / 3) # Renew at 1/3 of timeout
# Renew only if we still own the lock
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key,
self.identifier, self.timeout)
if result == 0:
# We lost the lock
self.stop_renewal.set()
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock and start auto-renewal."""
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
# Start renewal thread
self.stop_renewal.clear()
self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
self.renewal_thread.start()
return True
return False
def release(self) -> bool:
"""Release lock and stop renewal."""
self.stop_renewal.set()
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
class Redlock:
"""Redlock algorithm for distributed locking across multiple Redis instances."""
def __init__(self, redis_instances: list):
"""
Args:
redis_instances: List of Redis client connections
"""
self.instances = redis_instances
self.quorum = len(redis_instances) // 2 + 1
def acquire(self, resource: str, ttl: int = 10000) -> tuple:
"""
Acquire lock across multiple Redis instances.
Returns:
(success: bool, lock_identifier: str)
"""
identifier = str(uuid.uuid4())
start_time = int(time.time() * 1000)
# Try to acquire lock on all instances
acquired = 0
for instance in self.instances:
try:
if instance.set(f"lock:{resource}", identifier,
nx=True, px=ttl):
acquired += 1
except Exception:
pass
# Calculate elapsed time
elapsed = int(time.time() * 1000) - start_time
validity_time = ttl - elapsed - 100 # drift compensation
# Check if we got quorum
if acquired >= self.quorum and validity_time > 0:
return True, identifier
else:
# Release locks if we didn't get quorum
self._release_all(resource, identifier)
return False, None
def _release_all(self, resource: str, identifier: str):
"""Release lock on all instances."""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
for instance in self.instances:
try:
instance.eval(lua_script, 1, f"lock:{resource}", identifier)
except Exception:
pass
# User profile storage
def save_user_profile(user_id: int, profile: dict):
"""Save user profile as hash."""
key = f"user:profile:{user_id}"
r.hset(key, mapping=profile)
r.expire(key, 86400) # 24 hour TTL
def get_user_profile(user_id: int) -> dict:
"""Get complete user profile."""
key = f"user:profile:{user_id}"
return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str):
"""Update single profile field."""
key = f"user:profile:{user_id}"
r.hset(key, field, value)
# Example usage
save_user_profile(123, {
"username": "alice",
"email": "[email protected]",
"age": "30"
})
# Atomic increment
r.hincrby("user:profile:123", "login_count", 1)
# Job queue implementation
def enqueue_job(queue_name: str, job_data: dict):
"""Add job to queue."""
key = f"queue:{queue_name}"
r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict:
"""Get job from queue (blocking)."""
key = f"queue:{queue_name}"
if timeout > 0:
# Blocking pop with timeout
result = r.blpop(key, timeout=timeout)
if result:
_, job_data = result
return json.loads(job_data)
else:
# Non-blocking pop
job_data = r.lpop(key)
if job_data:
return json.loads(job_data)
return None
# Activity feed
def add_to_feed(user_id: int, activity: dict):
"""Add activity to user feed."""
key = f"feed:{user_id}"
r.lpush(key, json.dumps(activity))
r.ltrim(key, 0, 99) # Keep only latest 100 items
r.expire(key, 604800) # 7 days
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list:
"""Get user feed with pagination."""
key = f"feed:{user_id}"
items = r.lrange(key, start, end)
return [json.loads(item) for item in items]
# Tags and relationships
def add_tags(item_id: int, tags: list):
"""Add tags to item."""
key = f"item:{item_id}:tags"
r.sadd(key, *tags)
def get_tags(item_id: int) -> set:
"""Get all tags for item."""
key = f"item:{item_id}:tags"
return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set:
"""Find items having all specified tags."""
keys = [f"item:*:tags" for _ in tags]
# This is simplified - in practice, you'd need to track item IDs differently
return r.sinter(*keys)
# Online users tracking
def user_online(user_id: int):
"""Mark user as online."""
r.sadd("users:online", user_id)
r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int):
"""Mark user as offline."""
r.srem("users:online", user_id)
def get_online_users() -> set:
"""Get all online users."""
return r.smembers("users:online")
def get_online_count() -> int:
"""Get count of online users."""
return r.scard("users:online")
# Leaderboard implementation
def update_score(leaderboard: str, user_id: int, score: float):
"""Update user score in leaderboard."""
key = f"leaderboard:{leaderboard}"
r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list:
"""Get top players (descending order)."""
key = f"leaderboard:{leaderboard}"
# ZREVRANGE for descending order (highest scores first)
return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int:
"""Get user's rank (0-indexed)."""
key = f"leaderboard:{leaderboard}"
# ZREVRANK for descending rank
rank = r.zrevrank(key, user_id)
return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float:
"""Get user's score."""
key = f"leaderboard:{leaderboard}"
score = r.zscore(key, user_id)
return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list:
"""Get users within score range."""
key = f"leaderboard:{leaderboard}"
return r.zrangebyscore(key, min_score, max_score, withscores=True)
# Time-based sorted set (activity stream)
def add_activity(user_id: int, activity: str):
"""Add timestamped activity."""
key = f"user:{user_id}:activities"
timestamp = time.time()
r.zadd(key, {activity: timestamp})
# Keep only last 24 hours
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)
def get_recent_activities(user_id: int, count: int = 10) -> list:
"""Get recent activities."""
key = f"user:{user_id}:activities"
# Get most recent (highest timestamps)
return r.zrevrange(key, 0, count - 1, withscores=True)
# Event stream
def add_event(stream_key: str, event_data: dict) -> str:
"""Add event to stream."""
# Returns auto-generated ID (timestamp-sequence)
event_id = r.xadd(stream_key, event_data)
return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list:
"""Read events from stream."""
events = r.xread({stream_key: start_id}, count=count)
# events format: [(stream_name, [(id, data), (id, data), ...])]
if events:
_, event_list = events[0]
return event_list
return []
# Consumer groups
def create_consumer_group(stream_key: str, group_name: str):
"""Create consumer group for stream."""
try:
r.xgroup_create(name=stream_key, groupname=group_name, id='0')
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def read_from_group(stream_key: str, group_name: str,
consumer_name: str, count: int = 10) -> list:
"""Read events as consumer in group."""
# Read new messages with '>'
events = r.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={stream_key: '>'},
count=count,
block=5000 # 5 second timeout
)
if events:
_, event_list = events[0]
return event_list
return []
def acknowledge_event(stream_key: str, group_name: str, event_id: str):
"""Acknowledge processed event."""
r.xack(stream_key, group_name, event_id)
# Example: Processing events with consumer group
def process_events(stream_key: str, group_name: str, consumer_name: str):
"""Process events from stream."""
create_consumer_group(stream_key, group_name)
while True:
events = read_from_group(stream_key, group_name, consumer_name, count=10)
for event_id, event_data in events:
try:
# Process event
process_event(event_data)
# Acknowledge successful processing
acknowledge_event(stream_key, group_name, event_id)
except Exception as e:
print(f"Failed to process event {event_id}: {e}")
# Event remains unacknowledged for retry
# Without pipelining (slow - multiple round trips)
for i in range(1000):
r.set(f"key:{i}", f"value:{i}")
# With pipelining (fast - single round trip)
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()
# Pipelining with reads
pipe = r.pipeline()
for i in range(100):
pipe.get(f"key:{i}")
values = pipe.execute()
# Builder pattern with pipeline
class DataLoader:
def __init__(self):
self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
"""Add user data."""
self.pipeline.hset(f"user:{user_id}", mapping=user_data)
return self
def add_to_set(self, set_name: str, value: str):
"""Add to set."""
self.pipeline.sadd(set_name, value)
return self
def execute(self):
"""Execute all pipelined commands."""
return self.pipeline.execute()
# Usage
loader = DataLoader()
results = (loader
.add_user(1, {"name": "Alice", "email": "[email protected]"})
.add_user(2, {"name": "Bob", "email": "[email protected]"})
.add_to_set("active_users", "1")
.add_to_set("active_users", "2")
.execute())
# Optimistic locking with WATCH
def transfer_credits(from_user: int, to_user: int, amount: int) -> bool:
"""Transfer credits between users with optimistic locking."""
with r.pipeline() as pipe:
while True:
try:
# Watch the keys we're going to modify
pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")
# Get current values
from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)
# Check if transfer is possible
if from_credits < amount:
pipe.unwatch()
return False
# Start transaction
pipe.multi()
pipe.set(f"user:{from_user}:credits", from_credits - amount)
pipe.set(f"user:{to_user}:credits", to_credits + amount)
# Execute transaction
pipe.execute()
return True
except redis.WatchError:
# Key was modified by another client - retry
continue
# Lua scripts for atomic operations
increment_script = """
local current = redis.call('GET', KEYS[1])
if not current then
current = 0
end
local new_val = tonumber(current) + tonumber(ARGV[1])
redis.call('SET', KEYS[1], new_val)
return new_val
"""
# Register and use Lua script
increment = r.register_script(increment_script)
new_value = increment(keys=['counter:views'], args=[1])
# Rate limiting with Lua
rate_limit_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
else
return 1
end
"""
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool:
"""Check if user is within rate limit."""
key = f"rate_limit:{user_id}"
result = rate_limiter(keys=[key], args=[limit, window])
return result == 1
# Get-or-set pattern with Lua
get_or_set_script = """
local value = redis.call('GET', KEYS[1])
if value then
return value
else
redis.call('SET', KEYS[1], ARGV[1])
redis.call('EXPIRE', KEYS[1], ARGV[2])
return ARGV[1]
end
"""
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600):
"""Get value from cache or compute and cache it."""
value = get_or_set(keys=[key], args=["__COMPUTING__", ttl])
if value == "__COMPUTING__":
# We set the placeholder - compute the real value
computed = compute_fn()
r.setex(key, ttl, computed)
return computed
return value
from redis.sentinel import Sentinel
# Connect to Sentinel
sentinel = Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
], socket_timeout=0.5)
# Get master connection
master = sentinel.master_for('mymaster', socket_timeout=0.5)
# Get replica connection (for read-only operations)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)
# Use master for writes
master.set('key', 'value')
# Use replica for reads (optional, for load distribution)
value = replica.get('key')
import asyncio
import redis.asyncio as redis
async def async_redis_operations():
"""Async Redis operations example."""
# Create async connection
r = await redis.from_url("redis://localhost")
try:
# Async operations
await r.set("async_key", "async_value")
value = await r.get("async_key")
print(f"Value: {value}")
# Async pipeline
async with r.pipeline(transaction=True) as pipe:
await pipe.set("key1", "value1")
await pipe.set("key2", "value2")
await pipe.get("key1")
results = await pipe.execute()
print(f"Pipeline results: {results}")
finally:
await r.close()
# Run async operations
asyncio.run(async_redis_operations())
# Production-ready connection pool
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50, # Max pool size
socket_timeout=5, # Socket timeout
socket_connect_timeout=5, # Connection timeout
socket_keepalive=True, # Keep TCP connection alive
socket_keepalive_options={
socket.TCP_KEEPIDLE: 60,
socket.TCP_KEEPINTVL: 10,
socket.TCP_KEEPCNT: 3
},
retry_on_timeout=True, # Retry on timeout
health_check_interval=30, # Health check every 30s
decode_responses=True # Auto-decode bytes to strings
)
r = redis.Redis(connection_pool=pool)
import redis
from redis.exceptions import ConnectionError, TimeoutError
import time
class ResilientRedisClient:
"""Redis client with retry logic and circuit breaker."""
def __init__(self, max_retries: int = 3, backoff: float = 0.1):
self.redis = redis.Redis(
host='localhost',
port=6379,
socket_timeout=5,
retry_on_timeout=True
)
self.max_retries = max_retries
self.backoff = backoff
def get_with_retry(self, key: str, default=None):
"""Get value with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
return self.redis.get(key) or default
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
# Log error and return default
print(f"Redis error after {self.max_retries} attempts: {e}")
return default
# Exponential backoff
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
"""Set value with retry logic."""
for attempt in range(self.max_retries):
try:
if ttl:
return self.redis.setex(key, ttl, value)
else:
return self.redis.set(key, value)
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
print(f"Redis error after {self.max_retries} attempts: {e}")
return False
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
def get_redis_info(section: str = None) -> dict:
"""Get Redis server information."""
return r.info(section=section)
def monitor_memory_usage():
"""Monitor Redis memory usage."""
info = r.info('memory')
used_memory = info['used_memory_human']
peak_memory = info['used_memory_peak_human']
memory_fragmentation = info['mem_fragmentation_ratio']
print(f"Used Memory: {used_memory}")
print(f"Peak Memory: {peak_memory}")
print(f"Fragmentation Ratio: {memory_fragmentation}")
return info
def monitor_stats():
"""Monitor Redis statistics."""
info = r.info('stats')
total_connections = info['total_connections_received']
total_commands = info['total_commands_processed']
ops_per_sec = info['instantaneous_ops_per_sec']
print(f"Total Connections: {total_connections}")
print(f"Total Commands: {total_commands}")
print(f"Ops/sec: {ops_per_sec}")
return info
def get_slow_log(count: int = 10):
"""Get slow query log."""
slow_log = r.slowlog_get(count)
for entry in slow_log:
print(f"Command: {entry['command']}")
print(f"Duration: {entry['duration']} microseconds")
print(f"Time: {entry['start_time']}")
print("---")
return slow_log
Use consistent, hierarchical naming:
# Good naming patterns