LangGraph

Summary

LangGraph is a low-level orchestration library from LangChain Inc. for building stateful, multi-actor LLM applications as directed graphs. Each node in the graph is a Python callable (a tool, an LLM call, a sub-agent); edges are transitions between nodes that can be conditional or fixed. A persistent state object — a typed TypedDict — flows through every node, giving the graph memory that survives across turns and across restarts when a checkpointer is attached.

This guide targets LangGraph 0.2.x / 0.3.x (current stable as of 2025). It is aimed at engineers who already know LangChain basics and want to build production-grade agentic pipelines: multi-step research agents, human-in-the-loop workflows, parallel fan-out/fan-in patterns, and long-running background tasks.

Table of Contents

Core Concepts

1. The State Graph Model

A LangGraph application is a StateGraph whose nodes mutate a shared state object. You define the state as a TypedDict (or a Pydantic model); LangGraph merges node return values into that state using reducer functions. The built-in operator.add reducer appends lists (useful for message history); custom reducers let you implement any merge logic.

2. Checkpointers and Persistence

A checkpointer serialises the full graph state after every node execution and stores it in a backend (SQLite, Postgres, Redis, in-memory). This enables:

Built-in checkpointers: MemorySaver (in-process, dev only), SqliteSaver, PostgresSaver (via langgraph-checkpoint-postgres).

3. Conditional Edges and Routing

Conditional edges are the primary control-flow mechanism. A routing function receives the current state and returns a string (or list of strings for fan-out) that LangGraph maps to destination node names. This replaces explicit if/else in your orchestration code and keeps routing logic introspectable by LangGraph's visualiser.

def route_after_llm(state: AgentState) -> str:
    if state["tool_calls"]:
        return "tools"      # branch to tool executor
    return "end"           # branch to END

4. The ReAct Agent Loop as a Graph

The canonical ReAct (Reason + Act) pattern maps naturally onto a two-node loop: an agent node calls the LLM and decides whether to use a tool, and a tools node executes the chosen tool and appends the result to the message history. A conditional edge routes back to the agent node if there are tool calls, or to END if the LLM produced a final answer. LangGraph ships create_react_agent() as a prebuilt helper that wires this loop for you.

5. Subgraphs and Multi-Agent Architectures

Nodes in a LangGraph graph can themselves be compiled StateGraph instances — creating subgraphs. This lets you compose complex multi-agent systems: a supervisor graph routes tasks to specialised worker subgraphs (researcher, coder, QA), each with its own state schema. The supervisor communicates with workers via a shared key in the parent state. LangGraph Studio can visualise the full nested topology.

6. Streaming and Async Execution

LangGraph supports three streaming modes:

All graph methods have async counterparts (ainvoke, astream, astream_events). In production, use async with a Postgres checkpointer and run under an ASGI framework (FastAPI) for concurrent request handling.

↑ Back to top

Industry Use Cases

Autonomous Research Pipeline (Knowledge Work)

A consulting firm builds a research agent that, given a company name, autonomously: (1) queries a web search tool, (2) scrapes and summarises relevant pages, (3) cross-references against an internal vector store of past reports, and (4) drafts a structured briefing. The graph uses a loop with a conditional edge that decides whether another search iteration is needed based on a confidence score stored in state. A Postgres checkpointer persists state so long-running research jobs survive server restarts.

Human-in-the-Loop Code Review (DevTools)

An internal tool generates PR review comments with an LLM, then pauses at an interrupt_before=["post_comment"] node. A human reviewer sees the draft comments in a web UI, edits them, and hits "approve." The graph resumes from the checkpoint with the updated comments and posts them to GitHub via a tool node. Without LangGraph's interrupt mechanism this would require a custom state machine; with it, it is ~30 lines of graph definition.

Customer Support Triage (SaaS)](

A SaaS company routes incoming support tickets through a classifier node (LLM assigns category and urgency), then fan-outs to parallel specialist subgraphs (billing agent, technical agent, policy agent) using Send-based parallelism. Each subgraph drafts a response independently; a merge node selects the best response and queues it for human review if confidence is low. Checkpoints allow the workflow to be audited and replayed for quality assurance.

Data Pipeline Anomaly Investigation (Data Engineering)

A data engineering team wraps diagnostic tools (run SQL query, fetch dbt test results, read Datadog metrics) in LangGraph tool nodes. When an alerting webhook fires, the graph autonomously runs a root-cause-analysis loop: hypothesise a cause → run a diagnostic query → evaluate result → loop or conclude. The final node writes a structured incident report to a Slack channel. The graph is deployed as a FastAPI endpoint behind the alerting webhook.

↑ Back to top

Code Examples

Example 1: Minimal ReAct Agent with Tool Calling

from typing import Annotated, TypedDict
import operator
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode

# --- State definition ---
class AgentState(TypedDict):
    # operator.add reducer appends new messages instead of overwriting
    messages: Annotated[list[BaseMessage], operator.add]

# --- Tool ---
@tool
def get_weather(city: str) -> str:
    """Return current weather for a city."""
    return f"The weather in {city} is 22°C and sunny."

tools = [get_weather]
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)

# --- Nodes ---
def agent_node(state: AgentState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def should_use_tools(state: AgentState) -> str:
    last = state["messages"][-1]
    if getattr(last, "tool_calls", None):
        return "tools"
    return "end"

# --- Build graph ---
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_use_tools, {"tools": "tools", "end": END})
graph.add_edge("tools", "agent")  # loop back after tool execution
app = graph.compile()

# --- Invoke ---
result = app.invoke({"messages": [HumanMessage(content="What's the weather in Tokyo?")]})
print(result["messages"][-1].content)

Example 2: Persistent Multi-Turn Agent with SQLite Checkpointer

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

@tool
def search_docs(query: str) -> str:
    """Search internal documentation."""
    return f"Documentation result for: {query}"

# SqliteSaver persists state to disk — survives process restarts
checkpointer = SqliteSaver.from_conn_string(":memory:")  # or a file path

agent = create_react_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=[search_docs],
    checkpointer=checkpointer,
)

# thread_id scopes memory to a single conversation
config = {"configurable": {"thread_id": "user-42-session-1"}}

# Turn 1
r1 = agent.invoke(
    {"messages": [{"role": "user", "content": "What is our refund policy?"}]},
    config=config,
)
print(r1["messages"][-1].content)

# Turn 2 — agent remembers context from Turn 1 via checkpointer
r2 = agent.invoke(
    {"messages": [{"role": "user", "content": "Does that apply to digital goods?"}]},
    config=config,
)
print(r2["messages"][-1].content)

Example 3: Human-in-the-Loop with interrupt_before

from typing import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

class ReviewState(TypedDict):
    draft: str
    approved: bool
    final: str

def generate_draft(state: ReviewState) -> dict:
    return {"draft": "Draft PR comment: consider extracting this into a helper function."}

def post_comment(state: ReviewState) -> dict:
    # In production: call GitHub API here
    print(f"Posting: {state['draft']}")
    return {"final": state["draft"]}

graph = StateGraph(ReviewState)
graph.add_node("generate_draft", generate_draft)
graph.add_node("post_comment", post_comment)
graph.set_entry_point("generate_draft")
graph.add_edge("generate_draft", "post_comment")
graph.add_edge("post_comment", END)

# interrupt_before pauses execution BEFORE post_comment runs
app = graph.compile(checkpointer=MemorySaver(), interrupt_before=["post_comment"])
config = {"configurable": {"thread_id": "review-1"}}

# Step 1: run until the interrupt
app.invoke({"draft": "", "approved": False, "final": ""}, config=config)

# Step 2: human edits state and resumes
app.update_state(config, {"draft": "Edited: please extract this into utils.py"})
app.invoke(None, config=config)  # None resumes from checkpoint

Example 4: Parallel Fan-Out with Send

from typing import Annotated, TypedDict
import operator
from langgraph.graph import StateGraph, END
from langgraph.types import Send

class OverallState(TypedDict):
    topics: list[str]
    summaries: Annotated[list[str], operator.add]

class WorkerState(TypedDict):
    topic: str

def fan_out(state: OverallState) -> list[Send]:
    # Dispatch one worker node per topic — runs in parallel
    return [Send("summarise_topic", {"topic": t}) for t in state["topics"]]

def summarise_topic(state: WorkerState) -> dict:
    # In practice: call an LLM here
    return {"summaries": [f"Summary of {state['topic']}"]}

def merge(state: OverallState) -> dict:
    combined = "\n".join(state["summaries"])
    print(f"Merged report:\n{combined}")
    return {}

graph = StateGraph(OverallState)
graph.add_node("summarise_topic", summarise_topic)
graph.add_node("merge", merge)
graph.set_entry_point("fan_out_edge")  # can't be a function; use conditional edge from START

from langgraph.graph import START
graph2 = StateGraph(OverallState)
graph2.add_node("summarise_topic", summarise_topic)
graph2.add_node("merge", merge)
graph2.add_conditional_edges(START, fan_out, ["summarise_topic"])
graph2.add_edge("summarise_topic", "merge")
graph2.add_edge("merge", END)
app2 = graph2.compile()

app2.invoke({"topics": ["climate change", "electric vehicles", "battery tech"], "summaries": []})

Example 5: Streaming Token Output in FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import json

app_api = FastAPI()
agent = create_react_agent(ChatOpenAI(model="gpt-4o"), tools=[])

@app_api.post("/chat")
async def chat(body: dict):
    async def generate():
        async for event in agent.astream_events(
            {"messages": [HumanMessage(content=body["message"])]},
            version="v2",
        ):
            # Filter for LLM token chunks only
            if event["event"] == "on_chat_model_stream":
                chunk = event["data"]["chunk"].content
                if chunk:
                    yield f"data: {json.dumps({'token': chunk})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")
↑ Back to top

Comparison / When to Use

Framework Abstraction Level State Management Human-in-the-Loop Parallelism Best For
LangGraph Low — explicit graph nodes/edges Typed state dict + reducers + checkpointers First-class (interrupt_before/after) Send-based fan-out, async nodes Production agents requiring fine-grained control, persistence, and complex loops
LangChain LCEL Medium — chain composition with | Passed between runnables (no built-in persistence) Manual RunnableParallel Linear pipelines, RAG chains, structured output extraction
CrewAI High — role-based agents and tasks Task context passing; no native checkpointer Limited Async task execution Rapid prototyping of multi-persona agent workflows
AutoGen High — conversation-based multi-agent Conversation history Human proxy agent Limited Research prototypes, code generation via back-and-forth agents
Temporal / Prefect Workflow orchestration (non-LLM native) Workflow state with retries and schedules Manual signals/approvals Native parallel activities Long-running business workflows where LLMs are one step among many
↑ Back to top

Gotchas & Anti-patterns

  1. Forgetting that reducers accumulate — state is never replaced unless you write a replacing reducer. The default operator.add reducer on a messages list means every turn appends messages. If you accidentally add a full state snapshot to messages instead of a single new message, your context window fills up fast. Always return only the delta from each node, not the full state.
  2. Using MemorySaver in production. MemorySaver stores state in a Python dict in-process. Any restart, crash, or pod scaling event wipes all thread histories. Use PostgresSaver (or SqliteSaver for single-instance deployments) for anything that needs to survive a process restart.
  3. Infinite loops without a loop counter or exit condition. A ReAct loop that never produces a final answer will call the LLM indefinitely, burning tokens. Always add a step_count field to your state and a conditional edge that routes to END when step_count > MAX_STEPS. create_react_agent does this automatically via recursion_limit; custom graphs must handle it explicitly.
  4. Not handling tool errors in state. ToolNode catches exceptions and puts them as ToolMessage content in the message list, but the LLM will often retry on error. Without explicit error-handling logic in your routing function (e.g., count consecutive errors and bail out), a broken tool causes an expensive retry loop.
  5. Mutable default values in state TypedDict. Do not initialise state keys with mutable defaults at class level (e.g., messages: list = [] at class body level — TypedDict doesn't support default values like dataclasses do). Instead, always pass initial state explicitly when invoking the graph. Using dataclasses.field(default_factory=list) patterns from dataclasses do not apply to TypedDict.
↑ Back to top

Exercises

  1. Build a self-correcting SQL agent. Create a LangGraph graph with three nodes: generate_sql (LLM writes SQL), run_sql (executes against a SQLite DB), and evaluate (checks if the result is non-empty and plausible). Add a conditional edge from evaluate that loops back to generate_sql with the error message in state if the query fails, or routes to END on success. Cap the loop at 3 attempts using a retries counter in state.
  2. Add persistence and multi-turn memory to the weather agent from Example 1. Replace the in-memory invocation with a SqliteSaver checkpointer backed by a file (e.g., weather_agent.db). Run the agent in a loop reading user input from stdin, passing thread_id="session-1" on every turn. Verify that asking "what about in Paris?" after a first question about Tokyo correctly references the prior context.
  3. Implement a supervisor + two specialist subgraphs. Build a parent graph with a supervisor node (LLM classifies task as "research" or "calculation"). Use conditional edges to route to two StateGraph subgraphs: a research subgraph (web search tool) and a calculation subgraph (Python REPL tool). The subgraph results are written back into the parent state and a final respond node synthesises the answer. Test with prompts that exercise both branches.
↑ Back to top

Quiz

Q1: What is the purpose of a reducer in LangGraph state, and what happens if you don't specify one?

A reducer is a function that determines how incoming values from a node are merged into the existing state for a given key. The default behaviour (no annotation) is replace — the new value overwrites the old one completely. Annotating a key with Annotated[list[X], operator.add] switches it to append, so each node's returned list is concatenated onto the existing list instead of replacing it. Choosing the wrong reducer is a common bug: using the default on a messages key means each node call erases prior messages.

Q2: How does LangGraph's interrupt mechanism differ from simply raising an exception mid-graph?

Raising an exception aborts execution and discards in-flight state (unless you catch it externally). LangGraph's interrupt_before / interrupt_after mechanism cleanly suspends execution at a deterministic point and writes a checkpoint, preserving the full state. The graph can then be inspected, the state can be edited via update_state(), and execution resumes from exactly where it paused by calling invoke(None, config=config). This makes human review workflows reliable and auditable — the interrupted state can be replayed or rolled back.

Q3: When would you use Send-based parallelism instead of RunnableParallel from LangChain?

Send is used when the number of parallel branches is dynamic — determined at runtime from state (e.g., fan-out over a variable-length list of topics). RunnableParallel has a fixed set of branches defined at graph-construction time. Additionally, Send dispatches each branch as a full node invocation within the LangGraph execution model, so each branch gets its own checkpointed state slot, whereas RunnableParallel runs runnables concurrently within a single LCEL step with no per-branch checkpointing.

Q4: What is the difference between streaming modes "values", "updates", and "messages"?

"values" emits the complete state snapshot after every node completes — useful for a progress dashboard showing full state. "updates" emits only the partial dict returned by each node — more efficient for large states where most keys don't change. "messages" emits individual LLM token chunks (streaming text) as they are generated, interleaved with node-level events — essential for chat UIs that need word-by-word output. Use astream_events(version="v2") when you need fine-grained event filtering across all three levels simultaneously.

Q5: Your LangGraph agent is deployed on Kubernetes with 3 replicas. A user starts a conversation, hits replica A on turn 1, and then hits replica B on turn 2. How do you ensure the agent has memory of turn 1?

Use a persistent, shared checkpointer backed by an external store — PostgresSaver (pointing to a shared Postgres instance) or a Redis-backed checkpointer. All replicas connect to the same backend using the same thread_id, so any replica can load the full conversation history regardless of which replica handled prior turns. MemorySaver is in-process only and will not work across replicas. You must also ensure the thread_id is stable per user session — typically stored in a session cookie or JWT claim and passed in the config of every invocation.

↑ Back to top

Further Reading

↑ Back to top