Part IV: Engineering
Chapter 18

18.3 Task Memory and Agent Handoffs

Multi-agent systems succeed or fail based on how well they transfer context between agents. Poor handoffs create confusion, redundant work, and inconsistent results.

What State to Preserve Between Tasks

When one agent completes a task and hands off to another, you must explicitly transfer relevant state. The challenge is deciding what to include without overwhelming the receiving agent with irrelevant context.

Essential State Elements

When transferring state between agents, include the task outcome describing what was accomplished and what succeeded or failed, evidence and reasoning providing the key data the decision was based on, remaining uncertainty capturing open questions and confidence levels, user context with relevant preferences and constraints from the user, and next steps with explicit recommendations for continuation.

State You Can Discard

Some state can be safely discarded between tasks. Internal processing artifacts such as intermediate calculations and search queries tried need not transfer unless directly relevant. Failed approaches can be discarded unless they inform future retry logic. System-level context containing implementation details the next agent does not need should also be excluded from handoffs.


@dataclass
class TaskHandoff:
    source_agent: str
    target_agent: str
    task_status: str  # 'completed', 'partial', 'failed', 'requires_input'
    
    outcome_summary: str
    key_findings: list[str]
    remaining_uncertainties: list[str]
    
    user_context: dict
    next_recommendations: list[str]
    
    # Metadata for debugging and audit
    timestamp: datetime
    confidence_score: float
    
    def to_prompt_context(self) -> str:
        lines = [
            f"Task status: {self.task_status}",
            f"Outcome: {self.outcome_summary}",
            "",
            "Key findings:"
        ]
        lines.extend(f"  - {f}" for f in self.key_findings)
        
        if self.remaining_uncertainties:
            lines.append("")
            lines.append("Remaining uncertainties:")
            lines.extend(f"  - {u}" for u in self.uncertainties)
        
        if self.next_recommendations:
            lines.append("")
            lines.append("Recommendations:")
            lines.extend(f"  - {r}" for r in self.next_recommendations)
        
        return "\n".join(lines)
            

Clean Handoff Protocols

Define explicit protocols for how agents communicate. Ambiguity in handoffs leads to dropped tasks, duplicated effort, and inconsistent results.

Structured Output

Agents should produce handoff documents in consistent, machine-readable formats that receiving agents can parse reliably.

Explicit Confirmation

Receiving agents should confirm receipt and understanding before the source agent considers the handoff complete.

Timeout Handling

If handoff confirmation does not arrive within expected time, escalate or retry based on defined policies.

Rollback Capability

If a handoff reveals incompatibilities, agents should be able to revert to previous state and attempt alternative approaches.


class AgentHandoffProtocol:
    def __init__(self, agent_registry: dict[str, Agent]):
        self.agents = agent_registry
        self.pending_handoffs: dict[str, TaskHandoff] = {}
    
    async def execute_handoff(self, handoff: TaskHandoff) -> bool:
        # 1. Validate receiving agent exists
        if handoff.target_agent not in self.agents:
            await self._handle_unknown_agent(handoff)
            return False
        
        # 2. Send handoff to target agent
        target = self.agents[handoff.target_agent]
        confirmation = await target.receive_handoff(handoff)
        
        # 3. Wait for confirmation with timeout
        if await self._wait_for_confirmation(handoff, timeout=30):
            # 4. Acknowledge and archive
            await self._acknowledge_handoff(handoff)
            return True
        else:
            # 5. Handle timeout - retry or escalate
            await self._handle_handoff_timeout(handoff)
            return False
    
    async def _wait_for_confirmation(self, handoff: TaskHandoff, 
                                       timeout: int) -> bool:
        start = time.time()
        while time.time() - start < timeout:
            if handoff.id in self.confirmations:
                return True
            await asyncio.sleep(0.5)
        return False
            

State Corruption and Recovery

Distributed AI systems are vulnerable to state corruption from network failures, conflicting updates, and inconsistent reads. Design for failure from the start.

Types of State Corruption

Distributed AI systems face several types of state corruption. Partial writes occur when some state updates succeed before a failure occurs, leaving the system in an inconsistent state. Conflicting updates happen when multiple agents update the same state differently, creating ambiguity about which version is correct. Stale reads arise from reading state before pending writes complete, giving the reader outdated information. Orphaned state refers to state associated with failed or abandoned tasks that no longer have a valid parent process.

Design for Consistency

Use atomic operations where possible. When atomicity is not possible, define clear precedence rules for conflicts. Always validate state integrity after recovery operations.

Recovery Patterns


class StateRecoveryManager:
    def __init__(self, state_store: StateStore):
        self.store = state_store
        self.checkpoint_interval = 5  # Save checkpoint every N operations
    
    async def validate_state(self, task_id: str) -> ValidationResult:
        state = await self.store.get(task_id)
        
        # Check required fields exist
        if missing := self._check_required_fields(state):
            return ValidationResult(False, f"Missing fields: {missing}")
        
        # Check data integrity (checksums, types)
        if not self._validate_integrity(state):
            return ValidationResult(False, "Integrity check failed")
        
        # Check consistency with related state
        if not await self._check_consistency(state):
            return ValidationResult(False, "Inconsistent with related state")
        
        return ValidationResult(True, "State valid")
    
    async def recover_to_checkpoint(self, task_id: str, 
                                     checkpoint_id: str):
        checkpoint = await self.store.get_checkpoint(checkpoint_id)
        if checkpoint.task_id != task_id:
            raise ValueError(f"Checkpoint {checkpoint_id} does not belong to task {task_id}")
        
        # Restore state from checkpoint
        await self.store.set(task_id, checkpoint.state)
        
        # Replay any operations that occurred after checkpoint
        operations = await self.store.get_operations_after(checkpoint_id)
        for op in operations:
            await self._replay_operation(op)
            await self.store.delete_operation(op.id)  # Mark as replayed
    
    async def _replay_operation(self, op: Operation):
        # Reapply operation to restored state
        if op.type == 'update':
            await self.store.apply_update(op.task_id, op.update)
        elif op.type == 'append':
            await self.store.apply_append(op.task_id, op.item)
            

Idempotency Considerations

Idempotent operations produce the same result regardless of how many times they are executed. This property is essential for reliable retry logic in distributed systems.

When to Make Operations Idempotent

Operations should be made idempotent in several scenarios: any operation that modifies shared state, any operation that can be retried after failure, any operation triggered by external events, and any operation that calls external APIs where the call might be inadvertently duplicated.

Idempotency Keys

Assign unique idempotency keys to operations. Before executing, check if the key has already been processed. If so, return the cached result.


class IdempotencyManager:
    def __init__(self, store: IdempotencyStore):
        self.store = store
        self.key_ttl = 24 * 60 * 60  # 24 hours
    
    async def execute_idempotent(self, key: str, 
                                  operation: Callable) -> Any:
        # Check if already executed
        existing = await self.store.get(key)
        if existing:
            return existing.result
        
        # Execute and store result
        result = await operation()
        await self.store.set(key, {
            'result': result,
            'executed_at': datetime.now(),
            'ttl': datetime.now() + timedelta(seconds=self.key_ttl)
        })
        return result
    
    async def execute_with_recovery(self, operation_id: str,
                                     operation: Callable,
                                     max_retries: int = 3) -> tuple[bool, Any]:
        for attempt in range(max_retries):
            try:
                result = await operation()
                return True, result
            except TransientError as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
        
        return False, None  # Should not reach here
            

DataForge: Idempotent Document Processing

DataForge assigns each incoming document an idempotency key based on document hash and processing intent. If the same document is submitted twice (e.g., due to network retry), the system detects the duplicate and returns the cached result rather than reprocessing. This prevents duplicate records, wasted computation, and inconsistent outputs.

Handoff Failure Handling

When handoffs fail, you need clear recovery strategies. The appropriate response depends on the failure type and your reliability requirements.

When the target agent is unavailable, the system should queue the handoff and retry with exponential backoff, resulting in delay but eventually completing the task. When handoff data is invalid, the system requests a re-send from the source agent, which may cause potential delay. When a timeout occurs without confirmation, the system escalates to a supervisor agent, triggering an alert and possibly requiring manual review. When failures repeat, the system marks the task as blocked and sends an alert since user intervention becomes required.

Key Takeaway

Design handoff protocols explicitly with state preservation, confirmation, timeout handling, and recovery in mind. The cost of robust handoffs is upfront complexity; the cost of poor handoffs is production failures.