Use when connecting an LLM agent to a full-stack application, external API, or third-party platform. Covers four integration patterns (REST, WebSocket/SSE, Webhook, Message Queue), interface design, reliability, security, and observability. Framework-agnostic — guides you to the right pattern for your situation, then gives concrete implementation direction for your chosen stack.
LLM agents are slow (10-60 seconds per run), stateful, and non-deterministic. Standard web API patterns assume fast, stateless, deterministic handlers. Naively connecting the two produces: blocked event loops, silent failures, duplicate runs, leaked state across users, and no visibility into what went wrong.
The discipline: Choose the right integration pattern for your latency and delivery requirements before writing any code. Most integration bugs come from using the wrong pattern, not from implementing the right one badly.
Answer these two questions:
Q1: Who initiates the request?
Q2: How long does the agent take, and does the user need live progress?
| Pattern | Best For | Avoid When |
|---|---|---|
| REST (sync) | Fast agents (<3s), simple request/response | Agent takes >3s — will timeout or block |
| REST (async + polling) | Medium agents (3-30s), client can poll | Real-time progress needed |
| SSE | Long agents, one-way progress stream to browser | Bidirectional communication needed |
| WebSocket | Long agents, bidirectional (user can cancel/redirect) | Simple one-way progress is enough |
| Webhook | Third-party platform pushes events to your server | You initiate the request |
| Message Queue | Background jobs, high volume, retryable work | Low latency required |
Only use if agent completes in < 3 seconds. Otherwise use async + polling.
POST /agent/run
Body: { query: string, session_id: string }
Response: { answer: string, citations: [...], run_id: string }
Stack recommendations:
async def handler# FastAPI — run blocking agent in thread pool
@app.post("/agent/run")
async def run_agent(request: AgentRequest):
result = await asyncio.to_thread(run_pipeline, request.query)
return result
For agents that take 5-30 seconds. Client submits job, polls for status.
POST /agent/run → { job_id: string } (immediate)
GET /agent/run/{job_id} → { status: queued|running|done|failed, result?: ... }
Implementation:
jobs = {} # or Redis
@app.post("/agent/run")
async def submit(request: AgentRequest):
job_id = str(uuid.uuid4())
jobs[job_id] = {"status": "queued"}
asyncio.create_task(run_and_store(job_id, request.query))
return {"job_id": job_id}
@app.get("/agent/run/{job_id}")
async def poll(job_id: str):
return jobs.get(job_id, {"status": "not_found"})
Polling interval: Tell the client to poll every 2-3 seconds. Exponential backoff after 30s.
For agents where you want to stream progress to a browser. One-way: server → client.
GET /agent/stream?query=...
Content-Type: text/event-stream