AI Workflow Orchestration — Building Multi-Step Pipelines with LangGraph and Temporal
AI workflow Orchestration coordinates multiple LLM calls, tools, and human decisions into reliable, stateful pipelines — this guide covers LangGraph for agent workflows and Temporal for long-running durable execution.
What You'll Learn
You'll learn to build stateful AI workflows with LangGraph, implement durable execution with Temporal, handle human-in-the-loop approvals, and orchestrate multi-agent systems with error recovery.
Why It Matters
Simple LLM chains break under real-world conditions — network failures, partial outputs, timeouts. Orchestration frameworks provide state persistence, retry logic, and Observability that turn fragile chains into production-grade workflows.
Real-World Use
Doda Browser's content moderation system uses Temporal to orchestrate a multi-step pipeline: image analysis, text classification, human review queue, and automated action — all with full audit trails and recovery from any failure point.
Workflow Architecture
flowchart TD
A[Input] --> B[Router]
B --> C{Type}
C -->|Simple| D[LLM Call]
C -->|Complex| E[Agent Loop]
D --> F[Validation]
E --> F
F --> G{Human Review?}
G -->|Yes| H[Approval Queue]
G -->|No| I[Output]
H --> J[Approved?]
J -->|Yes| I
J -->|No| K[Reject]
LangGraph: Stateful Agent Workflows
LangGraph models workflows as graphs with state passing between nodes.
from typing import TypedDict, List, Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
class AgentState(TypedDict):
input: str
analysis: str
search_results: str
final_answer: str
steps: List[str]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def analyze(state: AgentState) -> AgentState:
response = llm.invoke(
f"Analyze this query and determine what information is needed: {state['input']}"
)
state["analysis"] = response.content
state["steps"] = state.get("steps", []) + ["analyze"]
return state
def search(state: AgentState) -> AgentState:
# Simulate search
state["search_results"] = (
f"Search results for: {state['analysis'][:50]}..."
)
state["steps"] = state.get("steps", []) + ["search"]
return state
def generate(state: AgentState) -> AgentState:
response = llm.invoke(
f"Based on this analysis: {state['analysis']}\n"
f"And search results: {state['search_results']}\n"
f"Answer the user: {state['input']}"
)
state["final_answer"] = response.content
state["steps"] = state.get("steps", []) + ["generate"]
return state
def should_search(state: AgentState) -> Literal["search", "generate"]:
"""Router: decide if search is needed."""
if "search" in state["input"].lower():
return "search"
return "generate"
# Build graph
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze)
workflow.add_node("search", search)
workflow.add_node("generate", generate)
workflow.set_entry_point("analyze")
workflow.add_conditional_edges(
"analyze", should_search,
{"search": "search", "generate": "generate"}
)
workflow.add_edge("search", "generate")
workflow.add_edge("generate", END)
app = workflow.compile()
# Run
result = app.invoke({
"input": "Search for recent AI news and summarize it",
"analysis": "",
"search_results": "",
"final_answer": "",
"steps": []
})
print(f"Steps: {result['steps']}")
print(f"\nFinal answer: {result['final_answer'][:200]}...")
Expected output:
Steps: ['analyze', 'search', 'generate']
Final answer: Based on recent developments in AI, several key trends have emerged including advances in multimodal models, improved reasoning capabilities, and new open-source releases. The field continues to evolve rapidly...
Temporal: Durable Workflow Execution
Temporal provides reliability guarantees for long-running AI workflows.
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from datetime import timedelta
import asyncio
# Define activities
@workflow.defn
class ContentPipelineWorkflow:
@workflow.run
async def run(self, content_id: str) -> dict:
workflow.logger.info(f"Processing content: {content_id}")
# Step 1: Analyze content
analysis = await workflow.execute_activity(
analyze_content,
content_id,
start_to_close_timeout=timedelta(seconds=30),
retry_policy={"maximum_attempts": 3}
)
# Step 2: Moderate (with human review)
moderation = await workflow.execute_activity(
moderate_content,
analysis,
start_to_close_timeout=timedelta(hours=24),
)
if moderation["status"] == "rejected":
return {"status": "rejected", "reason": moderation["reason"]}
# Step 3: Publish
result = await workflow.execute_activity(
publish_content,
moderation,
start_to_close_timeout=timedelta(seconds=10),
)
return result
# Define activities as plain functions
async def analyze_content(content_id: str) -> dict:
return {"content_id": content_id, "text": "Sample content", "sentiment": "positive"}
async def moderate_content(analysis: dict) -> dict:
# Simulate human review
return {"status": "approved", "analysis": analysis}
async def publish_content(moderation: dict) -> dict:
return {"status": "published", "url": f"/content/{moderation['analysis']['content_id']}"}
print("Temporal workflow defined:")
print(" ContentPipelineWorkflow - 3 steps with retries")
print(" analyze_content -> moderate_content -> publish_content")
print(" Human-in-the-loop at moderation step (24h timeout)")
Expected output:
Temporal workflow defined:
ContentPipelineWorkflow - 3 steps with retries
analyze_content -> moderate_content -> publish_content
Human-in-the-loop at moderation step (24h timeout)
Human-in-the-Loop with LangGraph
Pause workflow execution for human approval before proceeding.
from typing import Optional
from pydantic import BaseModel, Field
class ApprovalRequest(BaseModel):
task_id: str
prompt: str
decision: Optional[bool] = None
reviewer_notes: Optional[str] = None
class HumanInTheLoopWorkflow:
def __init__(self):
self.pending_approvals = {}
def request_approval(self, req: ApprovalRequest) -> str:
self.pending_approvals[req.task_id] = req
print(f"\n[APPROVAL REQUIRED] Task: {req.task_id}")
print(f"Prompt: {req.prompt}")
print(f"Status: PENDING (call approve/reject)")
return req.task_id
def approve(self, task_id: str, notes: str = "") -> dict:
req = self.pending_approvals.get(task_id)
if not req:
return {"error": "Task not found"}
req.decision = True
req.reviewer_notes = notes
return {"status": "approved", "notes": notes}
def reject(self, task_id: str, reason: str) -> dict:
req = self.pending_approvals.get(task_id)
if not req:
return {"error": "Task not found"}
req.decision = False
req.reviewer_notes = reason
return {"status": "rejected", "reason": reason}
def check_status(self, task_id: str) -> Optional[dict]:
req = self.pending_approvals.get(task_id)
if not req:
return None
return {
"task_id": req.task_id,
"prompt": req.prompt,
"decision": req.decision,
"notes": req.reviewer_notes
}
# Simulate human-in-the-loop
wf = HumanInTheLoopWorkflow()
task_id = wf.request_approval(ApprovalRequest(
task_id="review-001",
prompt="Send promotional email to all users? Estimated reach: 50K"
))
print("\nHuman reviewer checks pending tasks...")
status = wf.check_status(task_id)
print(f"Status: {status}")
# Human approves
result = wf.approve(task_id, "Approved for send")
print(f"\nDecision: {result}")
Expected output:
[APPROVAL REQUIRED] Task: review-001
Prompt: Send promotional email to all users? Estimated reach: 50K
Status: PENDING (call approve/reject)
Human reviewer checks pending tasks...
Status: {'task_id': 'review-001', 'prompt': 'Send promotional email to all users? Estimated reach: 50K', 'decision': None, 'notes': None}
Decision: {'status': 'approved', 'notes': 'Approved for send'}
Multi-Agent Orchestration
Coordinate multiple specialized agents working on subtasks.
from concurrent.futures import ThreadPoolExecutor, as_completed
class AgentOrchestrator:
def __init__(self):
self.agents = {}
self.results = {}
def register_agent(self, name: str, agent_func):
self.agents[name] = agent_func
def orchestrate(self, task: str, subtasks: List[str]) -> dict:
print(f"Orchestrating task: {task}")
print(f"Assigning {len(subtasks)} subtasks to agents...")
with ThreadPoolExecutor(max_workers=len(subtasks)) as pool:
futures = {}
for subtask in subtasks:
agent = self.agents.get(subtask)
if agent:
futures[pool.submit(agent, subtask)] = subtask
for future in as_completed(futures):
subtask = futures[future]
try:
self.results[subtask] = future.result()
print(f" [DONE] {subtask}")
except Exception as e:
self.results[subtask] = {"error": str(e)}
print(f" [FAIL] {subtask}: {e}")
# Aggregate results
return {
"task": task,
"subtasks_completed": len(self.results),
"subtasks_failed": sum(
1 for r in self.results.values()
if "error" in r
),
"results": self.results
}
# Define agents
def research_agent(task: str) -> dict:
return {"topic": task, "findings": "Research complete"}
def write_agent(task: str) -> dict:
return {"output": "Draft complete", "word_count": 500}
def review_agent(task: str) -> dict:
return {"score": 85, "feedback": "Minor revisions needed"}
# Orchestrate
orchestrator = AgentOrchestrator()
orchestrator.register_agent("research", research_agent)
orchestrator.register_agent("writing", write_agent)
orchestrator.register_agent("review", review_agent)
results = orchestrator.orchestrate(
"Write article about AI",
["research", "writing", "review"]
)
print(f"\nAll results: {json.dumps(results, indent=2)}")
Expected output:
Orchestrating task: Write article about AI
Assigning 3 subtasks to agents...
[DONE] research
[DONE] writing
[DONE] review
All results: {
"task": "Write article about AI",
"subtasks_completed": 3,
"subtasks_failed": 0,
"results": {
"research": {"topic": "research", "findings": "Research complete"},
...
Common Errors
| Error | Cause | Fix |
|---|---|---|
| LangGraph workflow gets stuck in a loop | Conditional edge never returns END | Add a max iteration counter and a fallback edge to END |
| Temporal workflow times out | Activity start_to_close_timeout too short | Increase timeout based on actual LLM latency (30-60s) |
| Human approval never comes | No reminder mechanism | Add a reminder activity after 1 hour of pending approval |
| Multi-agent system produces conflicting results | No aggregation step after parallel execution | Add a merging agent that reconciles conflicting outputs |
| Workflow state is lost on crash | State not persisted | Use Temporal's workflow state or checkpoint to external store |
Practice Questions
How does LangGraph differ from a simple LangChain chain? LangGraph supports branching, cycles, and conditional routing; chains are linear sequences of steps.
What problem does Temporal solve that LangGraph does not? Temporal provides durable execution with automatic retries, timeouts, and recovery from Process crashes across multiple machines.
Why is human-in-the-loop important in AI workflows? Human review catches edge cases, prevents costly mistakes, and provides accountability for actions with significant consequences.
How does a conditional edge router work in LangGraph? A conditional edge calls a function that returns the name of the next node to execute based on current state.
Challenge: Build a multi-agent system where a supervisor agent delegates subtasks to specialist agents (researcher, writer, fact-checker, formatter), the specialists work in parallel, and the supervisor reviews and merges outputs — all orchestrated as a single LangGraph workflow.
Mini Project
Build a content repurposing pipeline. Create a Temporal workflow that takes a blog post, sends it through three parallel AI agents (one for LinkedIn summary, one for Twitter thread, one for newsletter excerpt), collects the results, routes them through a human approval step with 24-hour timeout, and publishes approved content to respective platforms via API.
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro