Production-grade WebSocket patterns for Python (FastAPI/Starlette) with connection management, rooms, and message protocols
Production-grade WebSocket patterns for real-time communication in Python applications.
Use this skill when:
A WebSocket connection is fundamentally different from HTTP:
HTTP: Request → Response → Done
WebSocket: Connect → Accept → [Receive Loop] → Disconnect → Cleanup
The receive loop is unavoidable because:
┌─────────────────────────────────────────────────────────────┐
│ WebSocket System │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────────────────────────┐ │
│ │ Client 1 │───▶│ │ │
│ └──────────────┘ │ │ │
│ │ Connection Manager │ │
│ ┌──────────────┐ │ ┌─────────────────────────────┐ │ │
│ │ Client 2 │───▶│ │ connections: Set[WebSocket] │ │ │
│ └──────────────┘ │ │ rooms: Dict[str, Set[WS]] │ │ │
│ │ └─────────────────────────────┘ │ │
│ ┌──────────────┐ │ │ │
│ │ Client N │───▶│ connect() → track │ │
│ └──────────────┘ │ disconnect() → remove │ │
│ │ broadcast() → fan-out │ │
│ │ send_to_room() → targeted │ │
│ └──────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
app/
├── websocket/
│ ├── __init__.py
│ ├── manager.py # ConnectionManager class
│ ├── handlers.py # Message handlers by type
│ ├── models.py # Message schemas (Pydantic)
│ └── router.py # WebSocket endpoint
└── main.py # Mount WebSocket router
Connections must be:
# Connection state lives in ONE place
class ConnectionManager:
def __init__(self):
self.active_connections: set[WebSocket] = set()
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
self.active_connections.add(websocket)
def disconnect(self, websocket: WebSocket) -> None:
self.active_connections.discard(websocket)
Business logic must NEVER handle socket tracking directly.
The receive loop:
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True: # THE LOOP
data = await websocket.receive_text()
# validate → dispatch → respond
except WebSocketDisconnect:
pass # Expected, not exceptional
finally:
manager.disconnect(websocket) # Always cleanup
Every message must be:
{"type": "chat", "payload": {...}}type field routes to handlersclass WebSocketMessage(BaseModel):
type: str
payload: dict = {}
A room is a logical routing layer, not a socket feature:
class ConnectionManager:
def __init__(self):
self.connections: set[WebSocket] = set()
self.rooms: dict[str, set[WebSocket]] = defaultdict(set)
def join_room(self, websocket: WebSocket, room_id: str) -> None:
self.rooms[room_id].add(websocket)
def leave_room(self, websocket: WebSocket, room_id: str) -> None:
self.rooms[room_id].discard(websocket)
async def broadcast_to_room(self, room_id: str, message: str) -> None:
for connection in self.rooms[room_id]:
await connection.send_text(message)
Key insights:
Performance is about control, not speed:
| Cost Center | Why It Matters |
|---|---|
| Fan-out O(n) | Broadcast to 1000 clients = 1000 sends |
| Serialization | JSON encoding per message adds up |
| Slow clients | One blocked send() can stall others |
Safety principles:
When implementing WebSocket features:
# Accept connection
await websocket.accept()
# Receive message
data = await websocket.receive_text()
data = await websocket.receive_json()
# Send message
await websocket.send_text(message)
await websocket.send_json(data)
# Close connection
await websocket.close(code=1000)
# Handle disconnection
from starlette.websockets import WebSocketDisconnect
Not:
Yes:
These are intentionally excluded to keep the core pattern clean:
See the references/ directory for:
connection-manager-pattern.md - Centralized connection trackingreceive-loop-pattern.md - Message loop structure and lifetimemessage-protocol-pattern.md - JSON message contractsrooms-pattern.md - Logical routing and channelserror-handling-pattern.md - Defensive failure handling