Part V: Evaluation, Reliability, and Governance
Chapter 24

Caching, Batching, and Optimization

"The fastest API call is the one you never make. Caching transforms AI from a pay-per-use service into a shared resource."

An Infrastructure Engineer

Caching Strategies

Caching eliminates redundant API calls by storing responses for reuse. For AI applications with repetitive queries, caching can reduce costs by 50-90% and improve latency to near-zero for cache hits.

Exact Match Caching

The simplest strategy: cache responses by exact prompt hash:


import hashlib
import json
from typing import Optional

class ExactMatchCache:
    def __init__(self, store: CacheStore, ttl: int = 3600):
        self.store = store
        self.ttl = ttl
    
    def _hash_prompt(self, prompt: str) -> str:
        """Create deterministic hash of prompt."""
        return hashlib.sha256(prompt.encode()).hexdigest()
    
    async def get(self, prompt: str) -> Optional[str]:
        """Get cached response for exact prompt match."""
        key = self._hash_prompt(prompt)
        return await self.store.get(key)
    
    async def set(self, prompt: str, response: str) -> None:
        """Cache response for exact prompt match."""
        key = self._hash_prompt(prompt)
        await self.store.set(key, response, ttl=self.ttl)
    
    async def get_or_compute(
        self, 
        prompt: str, 
        compute_fn: callable
    ) -> str:
        """Get from cache or compute and cache."""
        cached = await self.get(prompt)
        if cached is not None:
            return cached
        
        response = await compute_fn(prompt)
        await self.set(prompt, response)
        return response
        

Semantic Caching

For natural language, exact match is too restrictive. Semantic caching uses embeddings to find similar cached requests:


class SemanticCache:
    """
    Cache responses using semantic similarity.
    Returns cached response if similarity exceeds threshold.
    """
    def __init__(
        self,
        embedder: Embedder,
        store: CacheStore,
        similarity_threshold: float = 0.95,
        ttl: int = 3600
    ):
        self.embedder = embedder
        self.store = store
        self.threshold = similarity_threshold
        self.ttl = ttl
    
    async def get(self, prompt: str) -> Optional[str]:
        # Embed incoming prompt
        query_embedding = await self.embedder.embed(prompt)
        
        # Find most similar cached prompt
        candidates = await self.store.get_all()
        
        best_match = None
        best_similarity = 0.0
        
        for cached_prompt, cached_response in candidates.items():
            cached_embedding = await self.store.get_embedding(cached_prompt)
            similarity = cosine_similarity(query_embedding, cached_embedding)
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_match = cached_response
        
        if best_similarity >= self.threshold:
            return best_match
        
        return None
    
    async def set(self, prompt: str, response: str) -> None:
        embedding = await self.embedder.embed(prompt)
        await self.store.set(prompt, response, embedding, ttl=self.ttl)
        

Cache Hit Rate Targets

Realistic cache hit rates: Exact match 20-40% for interactive applications, 60-80% for batch workloads. Semantic caching can achieve 50-70% hit rates with good embedding models. Monitor your actual hit rates to validate caching strategy.

Batching Optimization

Request Batching

Some providers support batching multiple requests into a single API call, reducing per-request overhead:


class RequestBatcher:
    """
    Batch multiple requests for efficient processing.
    """
    def __init__(self, max_batch_size: int = 10, max_wait_ms: int = 100):
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.pending: list[tuple[Prompt, asyncio.Future]] = []
        self.timer_task = None
    
    async def add(self, prompt: str) -> str:
        """Add prompt to batch and return result."""
        future = asyncio.Future()
        self.pending.append((prompt, future))
        
        # Start timer if first request
        if len(self.pending) == 1:
            self.timer_task = asyncio.create_task(self._flush_after_delay())
        
        # Flush if batch is full
        if len(self.pending) >= self.max_batch_size:
            await self._flush()
        
        return await future
    
    async def _flush_after_delay(self):
        """Flush batch after wait time."""
        await asyncio.sleep(self.max_wait_ms / 1000)
        await self._flush()
    
    async def _flush(self):
        """Process all pending requests as batch."""
        if not self.pending:
            return
        
        # Cancel timer
        if self.timer_task:
            self.timer_task.cancel()
        
        prompts = [p for p, _ in self.pending]
        futures = [f for _, f in self.pending]
        self.pending = []
        
        # Process as batch
        results = await process_batch(prompts)
        
        for future, result in zip(futures, results):
            future.set_result(result)
        

Token Batching

Batch requests that can share context processing:


class TokenBatcher:
    """
    Batch requests that share a common context.
    Useful when many requests need the same system prompt or context.
    """
    def __init__(self, shared_context: str, max_tokens: int):
        self.shared_context = shared_context
        self.max_tokens = max_tokens
        self.pending: list[tuple[ str, asyncio.Future]] = []
    
    async def add_request(
        self, 
        user_specific: str,
        compute_fn: callable
    ) -> str:
        """Add request with shared context."""
        future = asyncio.Future()
        
        # Calculate total tokens
        total_tokens = self._count_tokens(self.shared_context) + \
                      self._count_tokens(user_specific)
        
        if total_tokens > self.max_tokens:
            # Process immediately if too large for batch
            combined = self.shared_context + user_specific
            result = await compute_fn(combined)
            return result
        
        self.pending.append((user_specific, future))
        
        if self._should_flush():
            await self._flush_batch(compute_fn)
        
        return await future
    
    def _should_flush(self) -> bool:
        # Check if pending requests approach token limit
        pending_tokens = sum(
            self._count_tokens(p) for p, _ in self.pending
        )
        return (len(self.pending) >= 10 or 
                pending_tokens + self._count_tokens(self.shared_context) > self.max_tokens * 0.9)
        

Practical Example: DataForge Document Processing

The DataForge team was optimizing batch document processing that was costing eight hundred dollars per day for processing ten thousand documents. The problem was that each document was processed independently with no optimization, leaving significant efficiency gains on the table. They faced a dilemma about whether to focus on processing faster or more efficiently.

The team decided to implement multi-layer optimization to address the cost problem from multiple angles. They implemented semantic caching which achieved a forty-five percent hit rate for similar document types, avoiding redundant API calls. They added request batching which improved throughput three times by processing multiple documents together. They created smart chunking that reduced context tokens by thirty percent, lowering the token cost per document. They implemented model routing using gpt-4o-mini for simple extractions which covered eighty percent of documents.

After implementing these optimizations, processing cost reduced to one hundred eighty dollars per day, achieving a 4.4 times cost reduction while maintaining the same throughput. The lesson is that batch processing and caching compound on each other. Each optimization enables more savings from the next.

Response Optimization

Output Length Control

Control output tokens to avoid over-generating:


def estimate_output_tokens(task: str, complexity: str) -> int:
    """
    Estimate appropriate max_tokens based on task.
    """
    base_tokens = {
        "classification": 10,
        "extraction": 50,
        "summarization": 150,
        "generation": 500,
    }
    
    complexity_multipliers = {
        "low": 1.0,
        "medium": 1.5,
        "high": 2.0
    }
    
    base = base_tokens.get(task, 100)
    multiplier = complexity_multipliers.get(complexity, 1.0)
    
    return int(base * multiplier)
        

Streaming Responses

Stream responses to improve perceived latency:


async def stream_generate(prompt: str) -> AsyncGenerator[str, None]:
    """
    Generate streaming response for improved perceived latency.
    """
    response = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    async for chunk in response:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content
        

Monitoring for Optimization

Cache Performance Metrics

Track cache effectiveness through specific metrics that reveal how well caching is working. Hit rate measures the percentage of requests served from cache rather than making new API calls. Miss reasons categorize why cache misses happen to identify patterns that could be addressed through better cache design. Byte hit rate measures the percentage of token volume served from cache, which accounts for the fact that different cached responses contain different numbers of tokens. Stale rate measures the percentage of cache entries served after the time-to-live has expired, which can indicate when cache refresh strategies need adjustment.

Optimization Dashboard

Build a dashboard showing optimization opportunities by comparing current performance against targets to quantify potential savings. For cache hit rate, improving from thirty-five percent to sixty percent represents an opportunity of two thousand four hundred dollars per month. For average output tokens, reducing from two hundred eighty to one hundred eighty tokens represents an opportunity of one thousand eight hundred dollars per month. For model selection accuracy, improving from seventy-two percent to ninety percent represents an opportunity of three thousand two hundred dollars per month. These metrics together guide where to focus optimization efforts for maximum return.

Research Frontier

Research on "predictive caching" uses ML models to predict what content will be requested and pre-compute responses. By anticipating user needs before they ask, predictive caching can achieve hit rates approaching 90%.