Part V: Evaluation, Reliability, and Governance
Chapter 23

Retry, Fallback, and Router Strategies

"Transient failures are a fact of life in distributed systems. The question is not whether to retry, but how to retry in a way that improves reliability without creating new problems."

A Distributed Systems Engineer

Why Retry Matters for AI Systems

AI inference involves multiple failure points: API unavailability, rate limiting, network timeouts, and model-level errors. Without retry logic, transient failures become permanent failures for users. With retry logic, you can recover from most transient issues automatically.

However, retry logic introduces new considerations specific to AI systems. Retrying an LLM call with the same prompt may produce a different, potentially better or worse, response. Unlike idempotent operations, AI generation retries do not guarantee the same result.

The Non-Idempotent Retry Problem

Traditional retries assume the same input produces the same output. AI retries do not guarantee this. A retry might fix a timeout but produce a different response than the original request. This means retry logic for AI must consider result consistency, not just error recovery.

Retry Strategies

Exponential Backoff

The most common retry strategy: wait increasingly longer between retries. This avoids overwhelming a failing service while eventually giving it time to recover.


import asyncio
import random

async def retry_with_exponential_backoff(
    operation: callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
) -> any:
    """
    Retry an operation with exponential backoff.
    
    Args:
        operation: The async operation to retry
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay cap in seconds
        jitter: Add randomness to prevent thundering herd
    """
    last_exception = None
    
    for attempt in range(max_retries + 1):
        try:
            return await operation()
        except RateLimitError as e:
            last_exception = e
            
            if attempt == max_retries:
                break
                
            # Calculate delay with exponential increase
            delay = min(base_delay * (2 ** attempt), max_delay)
            
            # Add jitter to prevent thundering herd
            if jitter:
                delay = delay * (0.5 + random.random())
            
            logger.warning(
                f"Rate limited. Retrying in {delay:.2f}s "
                f"(attempt {attempt + 1}/{max_retries})"
            )
            await asyncio.sleep(delay)
        
        except ServiceUnavailableError as e:
            last_exception = e
            
            if attempt == max_retries:
                break
                
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay = delay * (0.5 + random.random())
            
            await asyncio.sleep(delay)
    
    raise last_exception
        

Token Bucket Rate Limiting

Before retrying, respect rate limits by tracking tokens consumed:


import time
import asyncio
from dataclasses import dataclass

@dataclass
class TokenBucket:
    """Token bucket for rate limiting AI API calls"""
    capacity: int  # Maximum tokens
    refill_rate: float  # Tokens per second
    tokens: float
    last_refill: float
    
    def __post_init__(self):
        self.tokens = float(self.capacity)
        self.last_refill = time.time()
    
    async def acquire(self, tokens_needed: int) -> float:
        """Acquire tokens, waiting if necessary. Returns wait time."""
        while True:
            self._refill()
            
            if self.tokens >= tokens_needed:
                self.tokens -= tokens_needed
                return 0.0
            
            # Calculate wait time until enough tokens
            deficit = tokens_needed - self.tokens
            wait_time = deficit / self.refill_rate
            
            await asyncio.sleep(wait_time)
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + (elapsed * self.refill_rate)
        )
        self.last_refill = now
        

Idempotent Retry Design

For operations that should produce consistent results, use idempotency keys:


async def idempotent_generate(
    prompt: str,
    operation_id: str,
    max_retries: int = 3
) -> str:
    """
    Generate with idempotency guarantees.
    If the same operation_id is used, returns cached result.
    """
    # Check cache first
    cached = await cache.get(f"idempotent:{operation_id}")
    if cached:
        logger.info(f"Returning cached result for {operation_id}")
        return cached
    
    async def generate():
        response = await openai.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            # Idempotency key for API-level deduplication
            extra_headers={"OpenAI-Idempotency-Token": operation_id}
        )
        return response.choices[0].message.content
    
    result = await retry_with_exponential_backoff(generate, max_retries=max_retries)
    
    # Cache successful result
    await cache.set(f"idempotent:{operation_id}", result, ttl=3600)
    
    return result
        

When to Use Idempotent Retries

Idempotent retries are essential for operations that modify state (placing orders, sending messages). For pure generation tasks (summarization, translation), idempotency may not be needed if the downstream consumer tolerates different outputs.

Fallback Strategies

When retries are exhausted or errors are not retryable, fallbacks take over. A fallback is a predefined response or behavior that substitutes for the failed operation.

Model Fallback

If the primary model fails, use a simpler or more available model:


async def generate_with_model_fallback(
    prompt: str,
    primary_model: str = "gpt-4o",
    fallback_model: str = "gpt-4o-mini"
) -> str:
    """
    Try primary model, fall back to simpler model on failure.
    """
    try:
        response = await openai.chat.completions.create(
            model=primary_model,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    except ModelDeployedError:
        logger.warning(f"{primary_model} unavailable, trying {fallback_model}")
        
        response = await openai.chat.completions.create(
            model=fallback_model,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
        

Response Fallback

Return a predefined response when generation fails:


FALLBACK_RESPONSES = {
    "summarize": "I apologize, but I was unable to generate a summary at this time. Please try again or contact support for assistance.",
    "extract": {"entities": [], "error": "Extraction temporarily unavailable"},
    "classify": {"category": "unknown", "confidence": 0.0}
}

async def generate_with_response_fallback(
    task: str,
    prompt: str
) -> dict:
    try:
        return await generate_structured_output(prompt, task)
    except (RateLimitError, ServiceUnavailableError, TimeoutError):
        logger.error(f"Generation failed for task {task}, using fallback")
        return FALLBACK_RESPONSES.get(task, {"error": "Service temporarily unavailable"})
        

Practical Example: DataForge Document Processing Fallback

The DataForge team was implementing reliable document extraction for high-volume processing that needed to handle API failures gracefully. Some documents were failing mid-processing with no recovery mechanism, creating a dilemma about whether to block processing or continue with partial results.

The team decided to implement a multi-tier fallback strategy that handles different failure modes appropriately. The first tier retries with exponential backoff using three attempts to recover from transient failures. The second tier falls back to a smaller model (gpt-4o-mini) when the primary model is unavailable. The third tier extracts using a rule-based parser for simple documents that do not require AI processing. The fourth tier queues documents for manual processing with a flag when all automated approaches fail.

After implementation, document processing uptime improved from ninety-four percent to ninety-nine point seven percent, and manual intervention dropped by eighty percent. The lesson is that multiple fallback tiers handle different failure modes effectively because not all failures are equal.

Routing Strategies

Routing directs requests to appropriate handlers based on request characteristics, system state, or business rules.

Capability Routing

Route based on request complexity:


def route_by_complexity(request: Request) -> str:
    """
    Route to appropriate model based on request complexity.
    """
    complexity_score = estimate_complexity(request)
    
    if complexity_score < 0.3:
        return "gpt-4o-mini"  # Simple tasks
    elif complexity_score < 0.7:
        return "gpt-4o"  # Standard tasks
    else:
        return "gpt-4o-with-extended-thinking"  # Complex tasks

def estimate_complexity(request: Request) -> float:
    """Estimate task complexity based on various signals."""
    score = 0.0
    
    # Longer prompts tend to be more complex
    score += min(len(request.prompt) / 10000, 0.3)
    
    # Structured output requests are more complex
    if request.output_format:
        score += 0.2
    
    # Multi-step tasks are more complex
    if request.steps > 1:
        score += 0.2
    
    # Tool use indicates complexity
    if request.tools:
        score += 0.2
    
    return min(score, 1.0)
        

Load-Based Routing

Route based on current system load to balance latency and throughput:


class LoadAwareRouter:
    def __init__(self, aggregators: dict[str, LoadAggregator]):
        self.aggregators = aggregators
    
    def route(self, request: Request) -> str:
        """Route to least loaded model variant."""
        candidates = ["gpt-4o-mini", "gpt-4o"]
        
        # Get current latency estimates
        latency_estimates = {
            model: self.aggregators[model].estimated_latency()
            for model in candidates
        }
        
        # Route to fastest available model
        fastest = min(latency_estimates, key=latency_estimates.get)
        
        logger.info(f"Routed to {fastest} (est. latency: {latency_estimates[fastest]:.2f}s)")
        return fastest
        

Cost-Aware Routing

Route to balance cost and quality requirements:


COST_PER_1K_TOKENS = {
    "gpt-4o-mini": 0.00015,
    "gpt-4o": 0.0025,
}

def route_by_cost_budget(request: Request, budget: float) -> str:
    """
    Route to most capable model within cost budget.
    """
    estimated_tokens = estimate_tokens(request)
    estimated_cost = (estimated_tokens / 1000) * COST_PER_1K_TOKENS["gpt-4o"]
    
    if estimated_cost <= budget:
        return "gpt-4o"
    else:
        return "gpt-4o-mini"
        

Research Frontier

Research on "learned routing" explores using models to predict which handler will perform best for a given request. By training on historical data, learned routers can outperform fixed rule-based routers for complex routing decisions.