Part IV: Engineering
Chapter 18

18.4 Durable Execution

Long-running AI workflows must survive server restarts, network failures, and hours or days of waiting. Durable execution ensures your workflows complete even when the unexpected occurs.

Long-Running Task Completion

AI applications often involve operations that take minutes, hours, or days to complete. These include document processing pipelines, research tasks that gather information from many sources, approval workflows that wait for human input, and batch operations that process thousands of items.

Traditional request-response patterns cannot handle these workloads. A server restart during a 2-hour task should not mean starting over. A user who steps away for a long weekend should find their workflow exactly where they left it.

The Durability Guarantee

Durable execution means that once a workflow step begins, it will eventually complete. If the process crashes, it resumes from the last successful checkpoint. If a server restarts, the workflow continues on another server.

Foundation for Reliability

Durable execution is the foundation for Chapter 23's reliability patterns. Without durable execution, you cannot provide the completion guarantees that production AI applications require.

Checkpointing Progress

Checkpoints capture the complete state of a workflow at a specific point. To resume after failure, you need only restore the last checkpoint and continue execution.

What to Checkpoint

Checkpoints should capture the complete workflow state including all variables and data that define current progress, the execution position indicating which step the workflow was executing, the input data being processed at the checkpoint, the output accumulated from completed steps, and pending timers with their scheduled events and deadlines.


@dataclass
class WorkflowCheckpoint:
    workflow_id: str
    step_id: str
    step_number: int
    
    state_snapshot: dict
    accumulated_outputs: dict
    
    step_inputs: dict
    step_outputs: dict
    
    timestamp: datetime
    checksum: str  # For integrity verification
    
class CheckpointManager:
    def __init__(self, storage: CheckpointStorage):
        self.storage = storage
    
    async def create_checkpoint(self, workflow: Workflow) -> WorkflowCheckpoint:
        checkpoint = WorkflowCheckpoint(
            workflow_id=workflow.id,
            step_id=workflow.current_step.id,
            step_number=workflow.current_step_number,
            state_snapshot=workflow.state.copy(),
            accumulated_outputs=workflow.outputs.copy(),
            step_inputs=workflow.current_step.inputs,
            step_outputs=workflow.current_step.outputs,
            timestamp=datetime.now(),
            checksum=self._compute_checksum(workflow)
        )
        
        await self.storage.save(checkpoint)
        return checkpoint
    
    def _compute_checksum(self, workflow: Workflow) -> str:
        data = json.dumps({
            'state': workflow.state,
            'step': workflow.current_step.id,
            'outputs': workflow.outputs
        }, sort_keys=True)
        return hashlib.sha256(data.encode()).hexdigest()
    
    async def restore_from_checkpoint(self, 
                                       checkpoint: WorkflowCheckpoint) -> Workflow:
        # Verify integrity
        if not self._verify_checksum(checkpoint):
            raise CorruptedCheckpointError(checkpoint.id)
        
        workflow = Workflow(id=checkpoint.workflow_id)
        workflow.state = checkpoint.state_snapshot
        workflow.outputs = checkpoint.accumulated_outputs
        workflow.current_step_number = checkpoint.step_number
        
        # Resume from the step after the checkpointed step
        workflow.current_step = workflow.get_step(checkpoint.step_id)
        return workflow
            

When to Checkpoint

Checkpoints should be created before long-running operations such as LLM calls and external API requests, after completing each workflow step, before waiting for external events like human approval and webhooks, and at natural boundaries in the workflow including phase transitions.

Resuming Interrupted Workflows

When a workflow resumes after interruption, it must be able to continue from exactly where it left off, handling any side effects that may have already occurred.

Deterministic Resume

The key to reliable resume is ensuring that resuming a workflow produces the same result as continuing uninterrupted would have. This requires idempotent operations so steps can be retried safely, side effect tracking to know what external changes have occurred, and conditional logic to skip steps whose conditions are already satisfied.


class WorkflowResumer:
    def __init__(self, checkpoint_manager: CheckpointManager,
                 execution_tracker: ExecutionTracker):
        self.checkpoints = checkpoint_manager
        self.tracker = execution_tracker
    
    async def resume_workflow(self, workflow_id: str) -> WorkflowResult:
        # Get last checkpoint
        checkpoint = await self.checkpoints.get_latest(workflow_id)
        if not checkpoint:
            raise NoCheckpointError(workflow_id)
        
        # Restore workflow state
        workflow = await self.checkpoints.restore_from_checkpoint(checkpoint)
        
        # Check which steps already completed
        completed_steps = await self.tracker.get_completed_steps(workflow_id)
        
        # Resume execution, skipping already-completed steps
        for step in workflow.get_remaining_steps():
            if step.id in completed_steps:
                continue
            
            # Check if step conditions are already satisfied
            if await self._conditions_satisfied(step, workflow):
                await self.tracker.mark_completed(workflow_id, step.id)
                continue
            
            # Execute step
            result = await self._execute_step(step, workflow)
            if result.failed:
                await self._handle_step_failure(step, result)
                return result
            
            await self.tracker.mark_completed(workflow_id, step.id)
        
        return workflow.final_result()
            

Timeout and Retry Strategies

Long-running workflows must handle timeouts gracefully and retry failed operations appropriately.

Timeout Configuration

LLM API calls typically warrant 60-120 second timeouts since they may be slow but rarely run indefinitely. External API calls depend on the external service SLA and commonly use 30-60 second timeouts. Human approval workflows require hours to days depending on business process requirements. Database operations should be fast and typically use 5-30 second timeouts, with longer durations potentially indicating an issue. Entire workflows may run for days to weeks depending on the specific use case.

Retry with Exponential Backoff


class RetryStrategy:
    def __init__(self, max_retries: int = 3, 
                 base_delay: float = 1.0,
                 max_delay: float = 60.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
    
    async def execute_with_retry(self, operation: Callable,
                                  *args, **kwargs) -> Any:
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await operation(*args, **kwargs)
            except RetryableError as e:
                last_exception = e
                if attempt < self.max_retries:
                    delay = min(self.base_delay * (2 ** attempt), 
                               self.max_delay)
                    await asyncio.sleep(delay)
        
        raise MaxRetriesExceededError(last_exception)

class TimeoutStrategy:
    def __init__(self, default_timeout: float = 60.0):
        self.default_timeout = default_timeout
        self.timeouts = {}
    
    def set_timeout(self, operation_id: str, timeout: float):
        self.timeouts[operation_id] = timeout
    
    async def execute_with_timeout(self, operation: Callable,
                                    operation_id: str = None) -> Any:
        timeout = self.timeouts.get(operation_id, self.default_timeout)
        
        try:
            return await asyncio.wait_for(
                operation(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            raise TimeoutError(f"Operation {operation_id} timed out after {timeout}s")
            

Workflow Timeout Handling

When workflows themselves time out (waiting for human input, external callbacks, etc.), you need escalation and notification strategies.


class WorkflowTimeoutHandler:
    def __init__(self, notification_service, escalation_rules):
        self.notifier = notification_service
        self.escalation = escalation_rules
    
    async def handle_workflow_timeout(self, workflow_id: str,
                                       wait_step: WaitStep):
        workflow = await self.get_workflow(workflow_id)
        
        # Determine escalation level based on wait duration
        escalation_level = self.escalation.get_level(
            wait_step.id, 
            workflow.time_waiting
        )
        
        # Notify relevant parties
        await self.notifier.send_timeout_alert(
            workflow_id=workflow_id,
            step=wait_step,
            escalation_level=escalation_level,
            time_waiting=workflow.time_waiting
        )
        
        # Execute escalation action
        if escalation_level.action == 'escalate':
            await self._escalate_to_manager(workflow)
        elif escalation_level.action == 'auto_approve':
            await self._auto_approve_and_continue(workflow, wait_step)
        elif escalation_level.action == 'cancel':
            await self._cancel_workflow(workflow)
    
    async def _schedule_timeout_check(self, workflow_id: str,
                                       wait_step: WaitStep,
                                       deadline: datetime):
        # Schedule a timeout check at deadline
        delay = (deadline - datetime.now()).total_seconds()
        asyncio.create_task(
            self._delayed_timeout_check(delay, workflow_id, wait_step)
        )
    
    async def _delayed_timeout_check(self, delay: float,
                                     workflow_id: str,
                                     wait_step: WaitStep):
        await asyncio.sleep(delay)
        workflow = await self.get_workflow(workflow_id)
        if workflow.current_step == wait_step and not wait_step.completed:
            await self.handle_workflow_timeout(workflow_id, wait_step)
            

HealthMetrics: Durable Patient Report Generation

HealthMetrics generates comprehensive patient reports through a long-running workflow that gathers data from multiple sources, runs analysis algorithms, synthesizes insights, and requires physician approval before delivery. The workflow checkpoints at each data gathering step and each synthesis phase. If the system crashes during the 2-hour report generation, it resumes from the last successful checkpoint. If a physician does not approve within 48 hours, the system sends escalation alerts and ultimately routes the approval to their department head.

Key Takeaway

Design for the assumption that failures will happen. Checkpoint frequently, make operations idempotent, and have clear timeout and escalation policies. Your users should never need to worry about losing work due to system failures.