Use when designing reliable AI agent systems — retry strategies, circuit breakers, fallbacks, graceful degradation, timeout management, and handling compound failures in multi-step agent workflows.
Agent systems fail differently than single API calls. Failures compound multiplicatively across steps, a stuck loop can run indefinitely, and a single slow dependency can freeze the entire pipeline. Reliability must be designed in — it cannot be bolted on after the fact.
Agent reliability compounds multiplicatively — unlike single LLM calls:
5 steps, each 99% reliable → 0.99^5 = 95% system reliability
10 steps, each 99% reliable → 0.99^10 = 90% system reliability
20 steps, each 99% reliable → 0.99^20 = 82% system reliability
Implication: Each component must be extremely reliable individually, or agent system reliability collapses. Target 99.9% per step for a 10-step workflow to achieve 99% system reliability. This is why agent reliability engineering is harder than regular API reliability.
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((RateLimitError, TimeoutError, APIConnectionError)),
reraise=True
)
async def call_llm_with_retry(prompt: str, model: str) -> LLMResponse:
return await llm.call(prompt, model=model, timeout=30)
# Different retry strategies by error type
RETRY_STRATEGIES = {
"rate_limit": {"max_attempts": 5, "base_delay": 2, "max_delay": 60},
"timeout": {"max_attempts": 2, "base_delay": 0, "max_delay": 0}, # fast fail
"server_error": {"max_attempts": 3, "base_delay": 1, "max_delay": 10},
"connection": {"max_attempts": 3, "base_delay": 0.5, "max_delay": 5},
}
Never retry: Validation errors (400), authorization errors (401/403) — retrying wastes time and money. Always retry with backoff: Rate limits (429), timeouts, transient server errors (500/503).
Prevent cascading failures by temporarily stopping calls to a failing service:
class AgentCircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60, success_threshold=2):
self.state = "closed" # closed=normal, open=failing, half_open=testing
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
self.success_count = 0
self.success_threshold = success_threshold
def call(self, fn, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open" # try one request
else:
raise CircuitOpenError("Service temporarily unavailable")
try:
result = fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state == "half_open":
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = "closed"
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
alert(f"Circuit breaker opened for service")
Production thresholds: Open after 5 failures in 60 seconds. Attempt recovery after 60 seconds. Close after 2 consecutive successes. Expose circuit state in your observability dashboard — patterns in circuit events reveal infrastructure problems.
Design every agent action with a fallback:
class AgentWithFallbacks:
def search_web(self, query: str) -> SearchResult:
try:
return primary_search_api.search(query)
except SearchAPIError:
try:
return fallback_search_api.search(query) # secondary provider
except Exception:
return SearchResult(
source="cache",
results=self.search_cache.get(query, []),
degraded=True # flag that this is stale/incomplete
)
def generate_response(self, prompt: str) -> str:
for model in ["claude-opus-4-5", "claude-sonnet-4-6", "claude-haiku-4-5"]:
try:
return llm.call(prompt, model=model)
except ModelUnavailableError:
continue # try next model
# All models failed — return cached or template response
return self.get_cached_response(prompt) or "Service temporarily unavailable."
Degradation tiers:
Always set degraded=True on fallback responses. Callers need to know the response quality is reduced.
import asyncio
class AgentTimeoutManager:
TIMEOUTS = {
"llm_call": 30, # 30 seconds for LLM response
"tool_execution": 10, # 10 seconds for tool calls
"web_search": 5, # 5 seconds for web search
"agent_task": 120, # 2 minutes for full task
"human_approval": 3600, # 1 hour for human input
}
async def with_timeout(self, operation_type: str, coro):
timeout = self.TIMEOUTS.get(operation_type, 30)
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise AgentTimeoutError(
f"{operation_type} exceeded {timeout}s timeout",
operation=operation_type,
timeout=timeout
)
Never use infinite timeouts. Always set explicit timeouts on every external call. A single hanging tool call can freeze an entire agent pipeline indefinitely.
Agents can get stuck in infinite loops — detect and break:
class LoopSafeAgent:
MAX_STEPS = 20
MAX_LOOPS = 3 # same action repeated
def run(self, task: str) -> AgentResult:
steps = []
action_history = Counter()
for step_num in range(self.MAX_STEPS):
action = self.decide_next_action(task, steps)
# Detect repeated actions
action_key = f"{action.tool}:{hash(str(action.params))}"
action_history[action_key] += 1
if action_history[action_key] > self.MAX_LOOPS:
return AgentResult(
status="stuck",
message=f"Agent stuck in loop: {action.tool} repeated {self.MAX_LOOPS} times",
steps_taken=steps,
partial_result=self.extract_partial_result(steps)
)
result = self.execute(action)
steps.append(Step(action, result))
if self.is_complete(result):
return AgentResult(status="success", steps_taken=steps, result=result)
# Max steps reached — return partial result, not a crash
return AgentResult(
status="max_steps_reached",
steps_taken=steps,
partial_result=self.extract_partial_result(steps)
)
Always return a partial result when hitting step limits. Hard crashes leave users with no information about what was accomplished before the limit was reached.
When an agent cannot complete fully, extract what was done:
def extract_partial_result(steps: list[Step]) -> PartialResult:
"""Return useful output even from an incomplete agent run"""
completed_steps = [s for s in steps if s.status == "success"]
findings = [s.result for s in completed_steps if s.result is not None]
return PartialResult(
completed_count=len(completed_steps),
total_attempted=len(steps),
findings=findings,
last_successful_step=completed_steps[-1].name if completed_steps else None,
resumable=True, # can be retried with context from this partial result
summary=f"Completed {len(completed_steps)}/{len(steps)} steps before stopping."
)
agentic-ai-patterns for integrating reliability patterns into agent loop design (where to place retries, circuit breakers, and checkpoints in the observe-think-act cycle)llm-observability for monitoring retry rates, circuit breaker state transitions, timeout frequency, and step limit hits in productionagentic-security — reliability mechanisms can be exploited (e.g., deliberately flooding requests to trigger circuit breakers as a denial-of-service)@ai-engineer uses this skill when building production agent systems; @sre uses this for reliability reviews of existing agent infrastructure