18.5 Workflow Orchestration Patterns
Common workflow patterns solve recurring problems. Master these patterns and know when to apply each: sequence, parallelization, branching, and compensation.
Sequential Workflows
The simplest orchestration pattern: steps execute one after another, each step receiving input from the previous step's output.
class SequentialWorkflow:
def __init__(self, name: str):
self.name = name
self.steps = []
def add_step(self, step: WorkflowStep) -> 'SequentialWorkflow':
self.steps.append(step)
return self
async def execute(self, initial_input: dict) -> dict:
current_output = initial_input
for step in self.steps:
current_output = await step.execute(current_output)
if step.failed:
raise WorkflowStepFailed(step, current_output)
return current_output
# DataForge: document ingestion pipeline
def create_document_ingestion_workflow():
return (SequentialWorkflow("DocumentIngestion")
.add_step(ReceiveDocument())
.add_step(ClassifyDocument())
.add_step(ExtractEntities())
.add_step(ValidateExtraction())
.add_step(StoreInDatabase())
.add_step(IndexForSearch())
)
When to Use Sequential Workflows
Sequential workflows work best when steps have clear dependencies where the output of step N becomes the input to step N+1, when order matters for correctness or user experience, and when steps cannot overlap in time due to dependencies or resource constraints.
Parallel Fan-Out/Fan-In
Execute multiple steps concurrently, then combine results. Essential for improving latency when independent tasks can run simultaneously.
class ParallelWorkflow:
def __init__(self, name: str, max_concurrency: int = 10):
self.name = name
self.branches = []
self.max_concurrency = max_concurrency
def add_branch(self, branch: list[WorkflowStep]) -> 'ParallelWorkflow':
self.branches.append(branch)
return self
async def execute(self, initial_input: dict) -> dict:
async def execute_branch(branch):
output = initial_input.copy()
for step in branch:
output = await step.execute(output)
return output
# Execute all branches concurrently
semaphore = asyncio.Semaphore(self.max_concurrency)
async def bounded_branch(branch):
async with semaphore:
return await execute_branch(branch)
results = await asyncio.gather(
*[bounded_branch(b) for b in self.branches],
return_exceptions=True
)
# Handle any exceptions
errors = [r for r in results if isinstance(r, Exception)]
if errors:
raise ParallelExecutionFailed(errors)
# Combine results
return self._combine_results(results)
def _combine_results(self, results: list[dict]) -> dict:
combined = {'branch_results': {}}
for i, result in enumerate(results):
combined['branch_results'][f'branch_{i}'] = result
return combined
# HealthMetrics: parallel patient data gathering
def create_patient_data_gathering_workflow():
return (ParallelWorkflow("PatientDataGathering", max_concurrency=5)
.add_branch([FetchLabResults(), NormalizeLabData()])
.add_branch([FetchMedications(), CategorizeMedications()])
.add_branch([FetchVitals(), CalculateVitalsSummary()])
.add_branch([FetchHistory(), SummarizeHistory()])
)
Fan-Out/Fan-In Variants
Map executes the same operation on multiple inputs concurrently. Scatter-Gather distributes work across multiple processors and then collects and aggregates the results. Pipeline parallel overlaps different pipeline stages on different items, allowing multiple items to move through the pipeline simultaneously at different stages.
Concurrency Limits
Always set max_concurrency to prevent overwhelming downstream systems. Database connections, API rate limits, and memory constraints all impose practical limits on parallelism.
Conditional Branching
Route workflow execution based on conditions. The condition can evaluate workflow state, external data, or LLM output.
class ConditionalWorkflow:
def __init__(self, name: str):
self.name = name
self.cases = {} # condition -> steps
def add_case(self, condition: Callable[[dict], bool],
steps: list[WorkflowStep]) -> 'ConditionalWorkflow':
self.cases[condition] = steps
return self
def add_default(self, steps: list[WorkflowStep]) -> 'ConditionalWorkflow':
self.default_case = steps
return self
async def execute(self, initial_input: dict) -> dict:
current_output = initial_input
# Find matching case
matched_case = None
for condition, steps in self.cases.items():
if condition(current_output):
matched_case = steps
break
if not matched_case and hasattr(self, 'default_case'):
matched_case = self.default_case
if not matched_case:
raise NoMatchingCaseError(f"No case matched for input: {current_output}")
# Execute matched case
for step in matched_case:
current_output = await step.execute(current_output)
return current_output
# Approval routing example
def create_approval_workflow():
wf = ConditionalWorkflow("ApprovalRouting")
wf.add_case(
lambda state: state.get('confidence', 0) >= 0.9,
[AutoApprove(), ProcessApproval()]
)
wf.add_case(
lambda state: state.get('confidence', 0) >= 0.7,
[SendForReview(), WaitForApproval(), ProcessApproval()]
)
wf.add_default(
[FlagForManualReview(), GatherMoreInfo(), ReassessConfidence()]
)
return wf
Dynamic Branching
For more complex routing, use LLM to determine the appropriate path based on workflow state:
class LLMRoutingWorkflow:
def __init__(self, llm_client):
self.llm = llm_client
self.routes = {}
def add_route(self, route_name: str, steps: list[WorkflowStep]):
self.routes[route_name] = steps
async def execute(self, initial_input: dict) -> dict:
# Ask LLM to choose route
routing_prompt = f"""Given the following workflow state, choose the most appropriate routing:
State: {json.dumps(initial_input, indent=2)}
Available routes: {list(self.routes.keys())}
Respond with only the route name."""
response = await self.llm.complete(routing_prompt)
chosen_route = response.text.strip()
if chosen_route not in self.routes:
raise InvalidRouteError(f"LLM chose invalid route: {chosen_route}")
# Execute chosen route
current_output = initial_input
for step in self.routes[chosen_route]:
current_output = await step.execute(current_output)
return current_output
Error Handling and Compensation
When errors occur, well-designed workflows can recover, retry, or compensate for partial work. Compensation undoes the effects of completed steps when a workflow cannot complete.
Retry Patterns
class RetryableStep(WorkflowStep):
def __init__(self, wrapped_step: WorkflowStep,
max_retries: int = 3,
backoff_multiplier: float = 2.0):
super().__init__(wrapped_step.name)
self.wrapped = wrapped_step
self.max_retries = max_retries
self.backoff = backoff_multiplier
async def execute(self, input: dict) -> dict:
last_error = None
for attempt in range(self.max_retries + 1):
try:
return await self.wrapped.execute(input)
except RetryableError as e:
last_error = e
if attempt < self.max_retries:
delay = (self.backoff ** attempt)
await asyncio.sleep(delay)
# All retries exhausted
self.failed = True
self.error = last_error
raise MaxRetriesExceededError(self.name, last_error)
# Wrap any step that may need retries
classified_step = RetryableStep(ClassifyDocument(), max_retries=3)
Compensation Patterns
class CompensableWorkflow:
def __init__(self, name: str):
self.name = name
self.steps = []
self.compensation_steps = {} # step_id -> compensation
def add_step(self, step: WorkflowStep,
compensation: WorkflowStep = None) -> 'CompensableWorkflow':
self.steps.append(step)
if compensation:
self.compensation_steps[step.id] = compensation
return self
async def execute(self, initial_input: dict) -> dict:
completed_steps = []
current_output = initial_input
try:
for step in self.steps:
current_output = await step.execute(current_output)
completed_steps.append(step)
except Exception as e:
# Compensate in reverse order
await self._compensate(completed_steps)
raise
return current_output
async def _compensate(self, completed_steps: list[WorkflowStep]):
for step in reversed(completed_steps):
if step.id in self.compensation_steps:
compensation = self.compensation_steps[step.id]
try:
await compensation.execute(step.output)
except CompensationError as e:
# Log but continue compensating other steps
logging.error(f"Compensation failed for {step.id}: {e}")
Error Categories and Responses
Transient errors from network issues or timeouts should be handled with retry using exponential backoff and do not require compensation. Validation errors from bad input should fail fast and reject the workflow without compensation. Resource errors such as database full or quota exceeded may need compensation depending on the specific situation, and typically warrant retry after delay or alert. Dependency errors from external services being down should wait and retry or implement circuit breaking, with no compensation needed. Logic errors from unexpected state require alerting and escalating to human operators, and do require compensation to undo partial work.
Orchestration Frameworks
Several frameworks implement these patterns and provide durable execution, monitoring, and failure recovery:
Temporal
Open source durable execution platform with strong consistency guarantees. Provides SDKs for Python, Go, Java. Best for mission-critical workflows requiring exactly-once semantics.
Prefect
Python-native workflow orchestration with hybrid execution model. Strong observability and caching. Good for data engineering pipelines.
Airflow
Mature workflow orchestration for data pipelines. Uses DAGs defined in Python. Heavy infrastructure requirements.
AWS Step Functions
Managed orchestration for AWS-native applications. Good integration with other AWS services. Limited by vendor lock-in.
# Example: Temporal workflow definition
from temporalio import workflow, activity
from datetime import timedelta
@activity.defn
async def classify_document(doc: dict) -> dict:
# Activity implementation
pass
@activity.defn
async def extract_entities(doc: dict) -> dict:
# Activity implementation
pass
@workflow.defn
class DocumentProcessingWorkflow:
@workflow.run
async def run(self, doc: dict) -> dict:
# Temporal provides automatic checkpointing
# Activities execute with retries and durability
classified = await workflow.execute_activity(
classify_document,
doc,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1)
)
)
if classified['confidence'] < 0.7:
# Wait for human approval - durable sleep
await workflow.wait_until_timeout(timedelta(hours=24))
extracted = await workflow.execute_activity(
extract_entities,
classified,
start_to_close_timeout=timedelta(seconds=120)
)
return extracted
Pattern Combination
Real applications combine these patterns. A large workflow often includes sequential phases, parallel sub-workflows, conditional routing, and sophisticated error handling.
class ComplexWorkflow:
def __init__(self):
self.phases = []
def add_phase(self, phase: WorkflowPhase) -> 'ComplexWorkflow':
self.phases.append(phase)
return self
async def execute(self, initial_input: dict) -> dict:
current_output = initial_input
for phase in self.phases:
if isinstance(phase, SequentialWorkflow):
current_output = await phase.execute(current_output)
elif isinstance(phase, ParallelWorkflow):
current_output = await phase.execute(current_output)
elif isinstance(phase, ConditionalWorkflow):
current_output = await phase.execute(current_output)
if not current_output.get('success', True):
await self._handle_phase_failure(phase, current_output)
return current_output
# HealthMetrics combines all patterns
def create_report_generation_workflow():
return (ComplexWorkflow()
.add_phase(SequentialWorkflow("Gathering")
.add_phase(ParallelWorkflow("ParallelDataFetch", max_concurrency=10)
.add_branch([FetchLabs(), FetchVitals(), FetchMedications()])
.add_branch([FetchHistory(), FetchNotes()])
)
)
.add_phase(ConditionalWorkflow("Analysis")
.add_case(lambda s: s['data_quality'] == 'high',
[RunStandardAnalysis()])
.add_case(lambda s: s['data_quality'] == 'low',
[RequestMoreData(), RunLimitedAnalysis()])
.add_default([FlagForManualReview()])
)
.add_phase(SequentialWorkflow("Review")
.add_phase(WaitForPhysicianApproval(timeout_hours=48))
.add_phase(GenerateFinalReport())
)
)
DataForge: Document Processing Orchestration
DataForge combines sequential classification, parallel extraction across document sections, conditional routing based on document type and confidence scores, and compensation for partial failures. When a document fails classification, no database records are created. When extraction partially succeeds, the system compensates by rolling back any stored entities and marking the document for manual processing.
Key Takeaway
Master these patterns individually, then combine them in larger workflows. Start with sequential for simplicity, add parallelism for performance, use conditionals for flexibility, and always design error handling from the start. See Chapter 23 for deeper reliability discussion.