Redis caching, pub/sub, and data structures. Activate for Redis operations, caching strategies, session management, and message queuing.
Provides comprehensive Redis capabilities for the Golden Armada AI Agent Fleet Platform.
Activate this skill when working with:
```bash
redis-cli -h localhost -p 6379 redis-cli -h localhost -p 6379 -a password
redis-cli ping ```
```bash
SET key "value" SET key "value" EX 3600 # With TTL GET key DEL key EXISTS key TTL key EXPIRE key 3600
HSET user:1 name "John" age "30" HGET user:1 name HGETALL user:1 HDEL user:1 age
LPUSH queue "item1" RPUSH queue "item2" LPOP queue RPOP queue LRANGE queue 0 -1
SADD tags "python" "redis" SMEMBERS tags SISMEMBER tags "python" SREM tags "python"
ZADD leaderboard 100 "player1" 200 "player2" ZRANGE leaderboard 0 -1 WITHSCORES ZRANK leaderboard "player1" ZINCRBY leaderboard 50 "player1" ```
```python import redis from redis import asyncio as aioredis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
async_redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)
r.set('key', 'value', ex=3600) value = r.get('key') r.delete('key')
r.hset('user:1', mapping={'name': 'John', 'age': '30'}) user = r.hgetall('user:1')
r.lpush('queue', 'item1', 'item2') items = r.lrange('queue', 0, -1) item = r.rpop('queue')
async def cache_get(key: str): async with aioredis.from_url("redis://localhost") as redis: return await redis.get(key) ```
```python async def get_agent(agent_id: str) -> dict: # Try cache first cache_key = f"agent:{agent_id}" cached = await redis.get(cache_key) if cached: return json.loads(cached)
# Cache miss - fetch from database
agent = await db.get_agent(agent_id)
if agent:
await redis.set(cache_key, json.dumps(agent), ex=3600)
return agent
async def update_agent(agent_id: str, data: dict) -> dict: # Update database agent = await db.update_agent(agent_id, data)
# Invalidate cache
cache_key = f"agent:{agent_id}"
await redis.delete(cache_key)
return agent
```
```python async def rate_limit(key: str, limit: int, window: int) -> bool: """ Sliding window rate limiter. Returns True if request is allowed. """ current = int(time.time()) window_key = f"rate:{key}:{current // window}"
async with redis.pipeline() as pipe:
pipe.incr(window_key)
pipe.expire(window_key, window * 2)
results = await pipe.execute()
count = results[0]
return count <= limit
if not await rate_limit(f"user:{user_id}", limit=100, window=60): raise HTTPException(status_code=429, detail="Rate limit exceeded") ```
```python import uuid
class DistributedLock: def init(self, redis_client, key: str, timeout: int = 10): self.redis = redis_client self.key = f"lock:{key}" self.timeout = timeout self.token = str(uuid.uuid4())
async def __aenter__(self):
while True:
acquired = await self.redis.set(
self.key, self.token, nx=True, ex=self.timeout
)
if acquired:
return self
await asyncio.sleep(0.1)
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Only release if we own the lock
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
await self.redis.eval(script, 1, self.key, self.token)
async with DistributedLock(redis, "resource:123"): await do_critical_work() ```
```python
async def publish_event(channel: str, message: dict): await redis.publish(channel, json.dumps(message))
async def subscribe_events(): pubsub = redis.pubsub() await pubsub.subscribe("agent:events")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await handle_event(data)
```
```python from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer import secrets
security = HTTPBearer()
async def create_session(user_id: str) -> str: session_id = secrets.token_urlsafe(32) session_key = f"session:{session_id}"
await redis.hset(session_key, mapping={
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
})
await redis.expire(session_key, 86400) # 24 hours
return session_id
async def get_session(token: str = Depends(security)) -> dict: session_key = f"session:{token.credentials}" session = await redis.hgetall(session_key)
if not session:
raise HTTPException(status_code=401, detail="Invalid session")
# Refresh TTL
await redis.expire(session_key, 86400)
return session
```
INFO memory