Skip to content

AI Workflow Orchestration — Building Multi-Step Pipelines with LangGraph and Temporal

DodaTech Updated 2026-06-22 7 min read

AI workflow Orchestration coordinates multiple LLM calls, tools, and human decisions into reliable, stateful pipelines — this guide covers LangGraph for agent workflows and Temporal for long-running durable execution.

What You'll Learn

You'll learn to build stateful AI workflows with LangGraph, implement durable execution with Temporal, handle human-in-the-loop approvals, and orchestrate multi-agent systems with error recovery.

Why It Matters

Simple LLM chains break under real-world conditions — network failures, partial outputs, timeouts. Orchestration frameworks provide state persistence, retry logic, and Observability that turn fragile chains into production-grade workflows.

Real-World Use

Doda Browser's content moderation system uses Temporal to orchestrate a multi-step pipeline: image analysis, text classification, human review queue, and automated action — all with full audit trails and recovery from any failure point.

Workflow Architecture

flowchart TD
    A[Input] --> B[Router]
    B --> C{Type}
    C -->|Simple| D[LLM Call]
    C -->|Complex| E[Agent Loop]
    D --> F[Validation]
    E --> F
    F --> G{Human Review?}
    G -->|Yes| H[Approval Queue]
    G -->|No| I[Output]
    H --> J[Approved?]
    J -->|Yes| I
    J -->|No| K[Reject]

LangGraph: Stateful Agent Workflows

LangGraph models workflows as graphs with state passing between nodes.

from typing import TypedDict, List, Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI

class AgentState(TypedDict):
    input: str
    analysis: str
    search_results: str
    final_answer: str
    steps: List[str]

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def analyze(state: AgentState) -> AgentState:
    response = llm.invoke(
        f"Analyze this query and determine what information is needed: {state['input']}"
    )
    state["analysis"] = response.content
    state["steps"] = state.get("steps", []) + ["analyze"]
    return state

def search(state: AgentState) -> AgentState:
    # Simulate search
    state["search_results"] = (
        f"Search results for: {state['analysis'][:50]}..."
    )
    state["steps"] = state.get("steps", []) + ["search"]
    return state

def generate(state: AgentState) -> AgentState:
    response = llm.invoke(
        f"Based on this analysis: {state['analysis']}\n"
        f"And search results: {state['search_results']}\n"
        f"Answer the user: {state['input']}"
    )
    state["final_answer"] = response.content
    state["steps"] = state.get("steps", []) + ["generate"]
    return state

def should_search(state: AgentState) -> Literal["search", "generate"]:
    """Router: decide if search is needed."""
    if "search" in state["input"].lower():
        return "search"
    return "generate"

# Build graph
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze)
workflow.add_node("search", search)
workflow.add_node("generate", generate)

workflow.set_entry_point("analyze")
workflow.add_conditional_edges(
    "analyze", should_search,
    {"search": "search", "generate": "generate"}
)
workflow.add_edge("search", "generate")
workflow.add_edge("generate", END)

app = workflow.compile()

# Run
result = app.invoke({
    "input": "Search for recent AI news and summarize it",
    "analysis": "",
    "search_results": "",
    "final_answer": "",
    "steps": []
})
print(f"Steps: {result['steps']}")
print(f"\nFinal answer: {result['final_answer'][:200]}...")

Expected output:

Steps: ['analyze', 'search', 'generate']

Final answer: Based on recent developments in AI, several key trends have emerged including advances in multimodal models, improved reasoning capabilities, and new open-source releases. The field continues to evolve rapidly...

Temporal: Durable Workflow Execution

Temporal provides reliability guarantees for long-running AI workflows.

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from datetime import timedelta
import asyncio

# Define activities
@workflow.defn
class ContentPipelineWorkflow:
    @workflow.run
    async def run(self, content_id: str) -> dict:
        workflow.logger.info(f"Processing content: {content_id}")

        # Step 1: Analyze content
        analysis = await workflow.execute_activity(
            analyze_content,
            content_id,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy={"maximum_attempts": 3}
        )

        # Step 2: Moderate (with human review)
        moderation = await workflow.execute_activity(
            moderate_content,
            analysis,
            start_to_close_timeout=timedelta(hours=24),
        )

        if moderation["status"] == "rejected":
            return {"status": "rejected", "reason": moderation["reason"]}

        # Step 3: Publish
        result = await workflow.execute_activity(
            publish_content,
            moderation,
            start_to_close_timeout=timedelta(seconds=10),
        )
        return result

# Define activities as plain functions
async def analyze_content(content_id: str) -> dict:
    return {"content_id": content_id, "text": "Sample content", "sentiment": "positive"}

async def moderate_content(analysis: dict) -> dict:
    # Simulate human review
    return {"status": "approved", "analysis": analysis}

async def publish_content(moderation: dict) -> dict:
    return {"status": "published", "url": f"/content/{moderation['analysis']['content_id']}"}

print("Temporal workflow defined:")
print("  ContentPipelineWorkflow - 3 steps with retries")
print("  analyze_content -> moderate_content -> publish_content")
print("  Human-in-the-loop at moderation step (24h timeout)")

Expected output:

Temporal workflow defined:
  ContentPipelineWorkflow - 3 steps with retries
  analyze_content -> moderate_content -> publish_content
  Human-in-the-loop at moderation step (24h timeout)

Human-in-the-Loop with LangGraph

Pause workflow execution for human approval before proceeding.

from typing import Optional
from pydantic import BaseModel, Field

class ApprovalRequest(BaseModel):
    task_id: str
    prompt: str
    decision: Optional[bool] = None
    reviewer_notes: Optional[str] = None

class HumanInTheLoopWorkflow:
    def __init__(self):
        self.pending_approvals = {}

    def request_approval(self, req: ApprovalRequest) -> str:
        self.pending_approvals[req.task_id] = req
        print(f"\n[APPROVAL REQUIRED] Task: {req.task_id}")
        print(f"Prompt: {req.prompt}")
        print(f"Status: PENDING (call approve/reject)")
        return req.task_id

    def approve(self, task_id: str, notes: str = "") -> dict:
        req = self.pending_approvals.get(task_id)
        if not req:
            return {"error": "Task not found"}
        req.decision = True
        req.reviewer_notes = notes
        return {"status": "approved", "notes": notes}

    def reject(self, task_id: str, reason: str) -> dict:
        req = self.pending_approvals.get(task_id)
        if not req:
            return {"error": "Task not found"}
        req.decision = False
        req.reviewer_notes = reason
        return {"status": "rejected", "reason": reason}

    def check_status(self, task_id: str) -> Optional[dict]:
        req = self.pending_approvals.get(task_id)
        if not req:
            return None
        return {
            "task_id": req.task_id,
            "prompt": req.prompt,
            "decision": req.decision,
            "notes": req.reviewer_notes
        }

# Simulate human-in-the-loop
wf = HumanInTheLoopWorkflow()
task_id = wf.request_approval(ApprovalRequest(
    task_id="review-001",
    prompt="Send promotional email to all users? Estimated reach: 50K"
))

print("\nHuman reviewer checks pending tasks...")
status = wf.check_status(task_id)
print(f"Status: {status}")

# Human approves
result = wf.approve(task_id, "Approved for send")
print(f"\nDecision: {result}")

Expected output:

[APPROVAL REQUIRED] Task: review-001
Prompt: Send promotional email to all users? Estimated reach: 50K
Status: PENDING (call approve/reject)

Human reviewer checks pending tasks...
Status: {'task_id': 'review-001', 'prompt': 'Send promotional email to all users? Estimated reach: 50K', 'decision': None, 'notes': None}

Decision: {'status': 'approved', 'notes': 'Approved for send'}

Multi-Agent Orchestration

Coordinate multiple specialized agents working on subtasks.

from concurrent.futures import ThreadPoolExecutor, as_completed

class AgentOrchestrator:
    def __init__(self):
        self.agents = {}
        self.results = {}

    def register_agent(self, name: str, agent_func):
        self.agents[name] = agent_func

    def orchestrate(self, task: str, subtasks: List[str]) -> dict:
        print(f"Orchestrating task: {task}")
        print(f"Assigning {len(subtasks)} subtasks to agents...")

        with ThreadPoolExecutor(max_workers=len(subtasks)) as pool:
            futures = {}
            for subtask in subtasks:
                agent = self.agents.get(subtask)
                if agent:
                    futures[pool.submit(agent, subtask)] = subtask

            for future in as_completed(futures):
                subtask = futures[future]
                try:
                    self.results[subtask] = future.result()
                    print(f"  [DONE] {subtask}")
                except Exception as e:
                    self.results[subtask] = {"error": str(e)}
                    print(f"  [FAIL] {subtask}: {e}")

        # Aggregate results
        return {
            "task": task,
            "subtasks_completed": len(self.results),
            "subtasks_failed": sum(
                1 for r in self.results.values()
                if "error" in r
            ),
            "results": self.results
        }

# Define agents
def research_agent(task: str) -> dict:
    return {"topic": task, "findings": "Research complete"}

def write_agent(task: str) -> dict:
    return {"output": "Draft complete", "word_count": 500}

def review_agent(task: str) -> dict:
    return {"score": 85, "feedback": "Minor revisions needed"}

# Orchestrate
orchestrator = AgentOrchestrator()
orchestrator.register_agent("research", research_agent)
orchestrator.register_agent("writing", write_agent)
orchestrator.register_agent("review", review_agent)

results = orchestrator.orchestrate(
    "Write article about AI",
    ["research", "writing", "review"]
)
print(f"\nAll results: {json.dumps(results, indent=2)}")

Expected output:

Orchestrating task: Write article about AI
Assigning 3 subtasks to agents...
  [DONE] research
  [DONE] writing
  [DONE] review

All results: {
  "task": "Write article about AI",
  "subtasks_completed": 3,
  "subtasks_failed": 0,
  "results": {
    "research": {"topic": "research", "findings": "Research complete"},
    ...

Common Errors

Error Cause Fix
LangGraph workflow gets stuck in a loop Conditional edge never returns END Add a max iteration counter and a fallback edge to END
Temporal workflow times out Activity start_to_close_timeout too short Increase timeout based on actual LLM latency (30-60s)
Human approval never comes No reminder mechanism Add a reminder activity after 1 hour of pending approval
Multi-agent system produces conflicting results No aggregation step after parallel execution Add a merging agent that reconciles conflicting outputs
Workflow state is lost on crash State not persisted Use Temporal's workflow state or checkpoint to external store

Practice Questions

  1. How does LangGraph differ from a simple LangChain chain? LangGraph supports branching, cycles, and conditional routing; chains are linear sequences of steps.

  2. What problem does Temporal solve that LangGraph does not? Temporal provides durable execution with automatic retries, timeouts, and recovery from Process crashes across multiple machines.

  3. Why is human-in-the-loop important in AI workflows? Human review catches edge cases, prevents costly mistakes, and provides accountability for actions with significant consequences.

  4. How does a conditional edge router work in LangGraph? A conditional edge calls a function that returns the name of the next node to execute based on current state.

  5. Challenge: Build a multi-agent system where a supervisor agent delegates subtasks to specialist agents (researcher, writer, fact-checker, formatter), the specialists work in parallel, and the supervisor reviews and merges outputs — all orchestrated as a single LangGraph workflow.

Mini Project

Build a content repurposing pipeline. Create a Temporal workflow that takes a blog post, sends it through three parallel AI agents (one for LinkedIn summary, one for Twitter thread, one for newsletter excerpt), collects the results, routes them through a human approval step with 24-hour timeout, and publishes approved content to respective platforms via API.

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro