"Transient failures are a fact of life in distributed systems. The question is not whether to retry, but how to retry in a way that improves reliability without creating new problems."
A Distributed Systems Engineer
Why Retry Matters for AI Systems
AI inference involves multiple failure points: API unavailability, rate limiting, network timeouts, and model-level errors. Without retry logic, transient failures become permanent failures for users. With retry logic, you can recover from most transient issues automatically.
However, retry logic introduces new considerations specific to AI systems. Retrying an LLM call with the same prompt may produce a different, potentially better or worse, response. Unlike idempotent operations, AI generation retries do not guarantee the same result.
The Non-Idempotent Retry Problem
Traditional retries assume the same input produces the same output. AI retries do not guarantee this. A retry might fix a timeout but produce a different response than the original request. This means retry logic for AI must consider result consistency, not just error recovery.
Retry Strategies
Exponential Backoff
The most common retry strategy: wait increasingly longer between retries. This avoids overwhelming a failing service while eventually giving it time to recover.
import asyncio
import random
async def retry_with_exponential_backoff(
operation: callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> any:
"""
Retry an operation with exponential backoff.
Args:
operation: The async operation to retry
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay cap in seconds
jitter: Add randomness to prevent thundering herd
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await operation()
except RateLimitError as e:
last_exception = e
if attempt == max_retries:
break
# Calculate delay with exponential increase
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
logger.warning(
f"Rate limited. Retrying in {delay:.2f}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
except ServiceUnavailableError as e:
last_exception = e
if attempt == max_retries:
break
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay = delay * (0.5 + random.random())
await asyncio.sleep(delay)
raise last_exception
Token Bucket Rate Limiting
Before retrying, respect rate limits by tracking tokens consumed:
import time
import asyncio
from dataclasses import dataclass
@dataclass
class TokenBucket:
"""Token bucket for rate limiting AI API calls"""
capacity: int # Maximum tokens
refill_rate: float # Tokens per second
tokens: float
last_refill: float
def __post_init__(self):
self.tokens = float(self.capacity)
self.last_refill = time.time()
async def acquire(self, tokens_needed: int) -> float:
"""Acquire tokens, waiting if necessary. Returns wait time."""
while True:
self._refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return 0.0
# Calculate wait time until enough tokens
deficit = tokens_needed - self.tokens
wait_time = deficit / self.refill_rate
await asyncio.sleep(wait_time)
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + (elapsed * self.refill_rate)
)
self.last_refill = now
Idempotent Retry Design
For operations that should produce consistent results, use idempotency keys:
async def idempotent_generate(
prompt: str,
operation_id: str,
max_retries: int = 3
) -> str:
"""
Generate with idempotency guarantees.
If the same operation_id is used, returns cached result.
"""
# Check cache first
cached = await cache.get(f"idempotent:{operation_id}")
if cached:
logger.info(f"Returning cached result for {operation_id}")
return cached
async def generate():
response = await openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
# Idempotency key for API-level deduplication
extra_headers={"OpenAI-Idempotency-Token": operation_id}
)
return response.choices[0].message.content
result = await retry_with_exponential_backoff(generate, max_retries=max_retries)
# Cache successful result
await cache.set(f"idempotent:{operation_id}", result, ttl=3600)
return result
When to Use Idempotent Retries
Idempotent retries are essential for operations that modify state (placing orders, sending messages). For pure generation tasks (summarization, translation), idempotency may not be needed if the downstream consumer tolerates different outputs.
Fallback Strategies
When retries are exhausted or errors are not retryable, fallbacks take over. A fallback is a predefined response or behavior that substitutes for the failed operation.
Model Fallback
If the primary model fails, use a simpler or more available model:
async def generate_with_model_fallback(
prompt: str,
primary_model: str = "gpt-4o",
fallback_model: str = "gpt-4o-mini"
) -> str:
"""
Try primary model, fall back to simpler model on failure.
"""
try:
response = await openai.chat.completions.create(
model=primary_model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except ModelDeployedError:
logger.warning(f"{primary_model} unavailable, trying {fallback_model}")
response = await openai.chat.completions.create(
model=fallback_model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Response Fallback
Return a predefined response when generation fails:
FALLBACK_RESPONSES = {
"summarize": "I apologize, but I was unable to generate a summary at this time. Please try again or contact support for assistance.",
"extract": {"entities": [], "error": "Extraction temporarily unavailable"},
"classify": {"category": "unknown", "confidence": 0.0}
}
async def generate_with_response_fallback(
task: str,
prompt: str
) -> dict:
try:
return await generate_structured_output(prompt, task)
except (RateLimitError, ServiceUnavailableError, TimeoutError):
logger.error(f"Generation failed for task {task}, using fallback")
return FALLBACK_RESPONSES.get(task, {"error": "Service temporarily unavailable"})
Practical Example: DataForge Document Processing Fallback
The DataForge team was implementing reliable document extraction for high-volume processing that needed to handle API failures gracefully. Some documents were failing mid-processing with no recovery mechanism, creating a dilemma about whether to block processing or continue with partial results.
The team decided to implement a multi-tier fallback strategy that handles different failure modes appropriately. The first tier retries with exponential backoff using three attempts to recover from transient failures. The second tier falls back to a smaller model (gpt-4o-mini) when the primary model is unavailable. The third tier extracts using a rule-based parser for simple documents that do not require AI processing. The fourth tier queues documents for manual processing with a flag when all automated approaches fail.
After implementation, document processing uptime improved from ninety-four percent to ninety-nine point seven percent, and manual intervention dropped by eighty percent. The lesson is that multiple fallback tiers handle different failure modes effectively because not all failures are equal.
Routing Strategies
Routing directs requests to appropriate handlers based on request characteristics, system state, or business rules.
Capability Routing
Route based on request complexity:
def route_by_complexity(request: Request) -> str:
"""
Route to appropriate model based on request complexity.
"""
complexity_score = estimate_complexity(request)
if complexity_score < 0.3:
return "gpt-4o-mini" # Simple tasks
elif complexity_score < 0.7:
return "gpt-4o" # Standard tasks
else:
return "gpt-4o-with-extended-thinking" # Complex tasks
def estimate_complexity(request: Request) -> float:
"""Estimate task complexity based on various signals."""
score = 0.0
# Longer prompts tend to be more complex
score += min(len(request.prompt) / 10000, 0.3)
# Structured output requests are more complex
if request.output_format:
score += 0.2
# Multi-step tasks are more complex
if request.steps > 1:
score += 0.2
# Tool use indicates complexity
if request.tools:
score += 0.2
return min(score, 1.0)
Load-Based Routing
Route based on current system load to balance latency and throughput:
class LoadAwareRouter:
def __init__(self, aggregators: dict[str, LoadAggregator]):
self.aggregators = aggregators
def route(self, request: Request) -> str:
"""Route to least loaded model variant."""
candidates = ["gpt-4o-mini", "gpt-4o"]
# Get current latency estimates
latency_estimates = {
model: self.aggregators[model].estimated_latency()
for model in candidates
}
# Route to fastest available model
fastest = min(latency_estimates, key=latency_estimates.get)
logger.info(f"Routed to {fastest} (est. latency: {latency_estimates[fastest]:.2f}s)")
return fastest
Cost-Aware Routing
Route to balance cost and quality requirements:
COST_PER_1K_TOKENS = {
"gpt-4o-mini": 0.00015,
"gpt-4o": 0.0025,
}
def route_by_cost_budget(request: Request, budget: float) -> str:
"""
Route to most capable model within cost budget.
"""
estimated_tokens = estimate_tokens(request)
estimated_cost = (estimated_tokens / 1000) * COST_PER_1K_TOKENS["gpt-4o"]
if estimated_cost <= budget:
return "gpt-4o"
else:
return "gpt-4o-mini"
Research Frontier
Research on "learned routing" explores using models to predict which handler will perform best for a given request. By training on historical data, learned routers can outperform fixed rule-based routers for complex routing decisions.