"The fastest API call is the one you never make. Caching transforms AI from a pay-per-use service into a shared resource."
An Infrastructure Engineer
Caching Strategies
Caching eliminates redundant API calls by storing responses for reuse. For AI applications with repetitive queries, caching can reduce costs by 50-90% and improve latency to near-zero for cache hits.
Exact Match Caching
The simplest strategy: cache responses by exact prompt hash:
import hashlib
import json
from typing import Optional
class ExactMatchCache:
def __init__(self, store: CacheStore, ttl: int = 3600):
self.store = store
self.ttl = ttl
def _hash_prompt(self, prompt: str) -> str:
"""Create deterministic hash of prompt."""
return hashlib.sha256(prompt.encode()).hexdigest()
async def get(self, prompt: str) -> Optional[str]:
"""Get cached response for exact prompt match."""
key = self._hash_prompt(prompt)
return await self.store.get(key)
async def set(self, prompt: str, response: str) -> None:
"""Cache response for exact prompt match."""
key = self._hash_prompt(prompt)
await self.store.set(key, response, ttl=self.ttl)
async def get_or_compute(
self,
prompt: str,
compute_fn: callable
) -> str:
"""Get from cache or compute and cache."""
cached = await self.get(prompt)
if cached is not None:
return cached
response = await compute_fn(prompt)
await self.set(prompt, response)
return response
Semantic Caching
For natural language, exact match is too restrictive. Semantic caching uses embeddings to find similar cached requests:
class SemanticCache:
"""
Cache responses using semantic similarity.
Returns cached response if similarity exceeds threshold.
"""
def __init__(
self,
embedder: Embedder,
store: CacheStore,
similarity_threshold: float = 0.95,
ttl: int = 3600
):
self.embedder = embedder
self.store = store
self.threshold = similarity_threshold
self.ttl = ttl
async def get(self, prompt: str) -> Optional[str]:
# Embed incoming prompt
query_embedding = await self.embedder.embed(prompt)
# Find most similar cached prompt
candidates = await self.store.get_all()
best_match = None
best_similarity = 0.0
for cached_prompt, cached_response in candidates.items():
cached_embedding = await self.store.get_embedding(cached_prompt)
similarity = cosine_similarity(query_embedding, cached_embedding)
if similarity > best_similarity:
best_similarity = similarity
best_match = cached_response
if best_similarity >= self.threshold:
return best_match
return None
async def set(self, prompt: str, response: str) -> None:
embedding = await self.embedder.embed(prompt)
await self.store.set(prompt, response, embedding, ttl=self.ttl)
Cache Hit Rate Targets
Realistic cache hit rates: Exact match 20-40% for interactive applications, 60-80% for batch workloads. Semantic caching can achieve 50-70% hit rates with good embedding models. Monitor your actual hit rates to validate caching strategy.
Batching Optimization
Request Batching
Some providers support batching multiple requests into a single API call, reducing per-request overhead:
class RequestBatcher:
"""
Batch multiple requests for efficient processing.
"""
def __init__(self, max_batch_size: int = 10, max_wait_ms: int = 100):
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending: list[tuple[Prompt, asyncio.Future]] = []
self.timer_task = None
async def add(self, prompt: str) -> str:
"""Add prompt to batch and return result."""
future = asyncio.Future()
self.pending.append((prompt, future))
# Start timer if first request
if len(self.pending) == 1:
self.timer_task = asyncio.create_task(self._flush_after_delay())
# Flush if batch is full
if len(self.pending) >= self.max_batch_size:
await self._flush()
return await future
async def _flush_after_delay(self):
"""Flush batch after wait time."""
await asyncio.sleep(self.max_wait_ms / 1000)
await self._flush()
async def _flush(self):
"""Process all pending requests as batch."""
if not self.pending:
return
# Cancel timer
if self.timer_task:
self.timer_task.cancel()
prompts = [p for p, _ in self.pending]
futures = [f for _, f in self.pending]
self.pending = []
# Process as batch
results = await process_batch(prompts)
for future, result in zip(futures, results):
future.set_result(result)
Token Batching
Batch requests that can share context processing:
class TokenBatcher:
"""
Batch requests that share a common context.
Useful when many requests need the same system prompt or context.
"""
def __init__(self, shared_context: str, max_tokens: int):
self.shared_context = shared_context
self.max_tokens = max_tokens
self.pending: list[tuple[ str, asyncio.Future]] = []
async def add_request(
self,
user_specific: str,
compute_fn: callable
) -> str:
"""Add request with shared context."""
future = asyncio.Future()
# Calculate total tokens
total_tokens = self._count_tokens(self.shared_context) + \
self._count_tokens(user_specific)
if total_tokens > self.max_tokens:
# Process immediately if too large for batch
combined = self.shared_context + user_specific
result = await compute_fn(combined)
return result
self.pending.append((user_specific, future))
if self._should_flush():
await self._flush_batch(compute_fn)
return await future
def _should_flush(self) -> bool:
# Check if pending requests approach token limit
pending_tokens = sum(
self._count_tokens(p) for p, _ in self.pending
)
return (len(self.pending) >= 10 or
pending_tokens + self._count_tokens(self.shared_context) > self.max_tokens * 0.9)
Practical Example: DataForge Document Processing
The DataForge team was optimizing batch document processing that was costing eight hundred dollars per day for processing ten thousand documents. The problem was that each document was processed independently with no optimization, leaving significant efficiency gains on the table. They faced a dilemma about whether to focus on processing faster or more efficiently.
The team decided to implement multi-layer optimization to address the cost problem from multiple angles. They implemented semantic caching which achieved a forty-five percent hit rate for similar document types, avoiding redundant API calls. They added request batching which improved throughput three times by processing multiple documents together. They created smart chunking that reduced context tokens by thirty percent, lowering the token cost per document. They implemented model routing using gpt-4o-mini for simple extractions which covered eighty percent of documents.
After implementing these optimizations, processing cost reduced to one hundred eighty dollars per day, achieving a 4.4 times cost reduction while maintaining the same throughput. The lesson is that batch processing and caching compound on each other. Each optimization enables more savings from the next.
Response Optimization
Output Length Control
Control output tokens to avoid over-generating:
def estimate_output_tokens(task: str, complexity: str) -> int:
"""
Estimate appropriate max_tokens based on task.
"""
base_tokens = {
"classification": 10,
"extraction": 50,
"summarization": 150,
"generation": 500,
}
complexity_multipliers = {
"low": 1.0,
"medium": 1.5,
"high": 2.0
}
base = base_tokens.get(task, 100)
multiplier = complexity_multipliers.get(complexity, 1.0)
return int(base * multiplier)
Streaming Responses
Stream responses to improve perceived latency:
async def stream_generate(prompt: str) -> AsyncGenerator[str, None]:
"""
Generate streaming response for improved perceived latency.
"""
response = await openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Monitoring for Optimization
Cache Performance Metrics
Track cache effectiveness through specific metrics that reveal how well caching is working. Hit rate measures the percentage of requests served from cache rather than making new API calls. Miss reasons categorize why cache misses happen to identify patterns that could be addressed through better cache design. Byte hit rate measures the percentage of token volume served from cache, which accounts for the fact that different cached responses contain different numbers of tokens. Stale rate measures the percentage of cache entries served after the time-to-live has expired, which can indicate when cache refresh strategies need adjustment.
Optimization Dashboard
Build a dashboard showing optimization opportunities by comparing current performance against targets to quantify potential savings. For cache hit rate, improving from thirty-five percent to sixty percent represents an opportunity of two thousand four hundred dollars per month. For average output tokens, reducing from two hundred eighty to one hundred eighty tokens represents an opportunity of one thousand eight hundred dollars per month. For model selection accuracy, improving from seventy-two percent to ninety percent represents an opportunity of three thousand two hundred dollars per month. These metrics together guide where to focus optimization efforts for maximum return.
Research Frontier
Research on "predictive caching" uses ML models to predict what content will be requested and pre-compute responses. By anticipating user needs before they ask, predictive caching can achieve hit rates approaching 90%.