LangGraph is a low-level orchestration library from LangChain Inc. for building stateful, multi-actor LLM applications as directed graphs. Each node in the graph is a Python callable (a tool, an LLM call, a sub-agent); edges are transitions between nodes that can be conditional or fixed. A persistent state object — a typed TypedDict — flows through every node, giving the graph memory that survives across turns and across restarts when a checkpointer is attached.
This guide targets LangGraph 0.2.x / 0.3.x (current stable as of 2025). It is aimed at engineers who already know LangChain basics and want to build production-grade agentic pipelines: multi-step research agents, human-in-the-loop workflows, parallel fan-out/fan-in patterns, and long-running background tasks.
A LangGraph application is a StateGraph whose nodes mutate a shared state object. You define the state as a TypedDict (or a Pydantic model); LangGraph merges node return values into that state using reducer functions. The built-in operator.add reducer appends lists (useful for message history); custom reducers let you implement any merge logic.
RunnableLambdas that receive the current state dict and return a partial state dict (only the keys they modify).add_edge) or conditional (add_conditional_edges), routing to different nodes based on a function that inspects the state.set_entry_point(); the first node executed when the graph is invoked.langgraph.graph.END that terminates execution.A checkpointer serialises the full graph state after every node execution and stores it in a backend (SQLite, Postgres, Redis, in-memory). This enables:
thread_id and it resumes from the last checkpoint.thread_id; the checkpointer maintains per-thread history without any manual session management.Built-in checkpointers: MemorySaver (in-process, dev only), SqliteSaver, PostgresSaver (via langgraph-checkpoint-postgres).
Conditional edges are the primary control-flow mechanism. A routing function receives the current state and returns a string (or list of strings for fan-out) that LangGraph maps to destination node names. This replaces explicit if/else in your orchestration code and keeps routing logic introspectable by LangGraph's visualiser.
def route_after_llm(state: AgentState) -> str:
if state["tool_calls"]:
return "tools" # branch to tool executor
return "end" # branch to END
The canonical ReAct (Reason + Act) pattern maps naturally onto a two-node loop: an agent node calls the LLM and decides whether to use a tool, and a tools node executes the chosen tool and appends the result to the message history. A conditional edge routes back to the agent node if there are tool calls, or to END if the LLM produced a final answer. LangGraph ships create_react_agent() as a prebuilt helper that wires this loop for you.
Nodes in a LangGraph graph can themselves be compiled StateGraph instances — creating subgraphs. This lets you compose complex multi-agent systems: a supervisor graph routes tasks to specialised worker subgraphs (researcher, coder, QA), each with its own state schema. The supervisor communicates with workers via a shared key in the parent state. LangGraph Studio can visualise the full nested topology.
LangGraph supports three streaming modes:
"values" — emits the full state after every node"updates" — emits only the delta (partial state) returned by each node"messages" — streams individual LLM token chunks for chat UIsAll graph methods have async counterparts (ainvoke, astream, astream_events). In production, use async with a Postgres checkpointer and run under an ASGI framework (FastAPI) for concurrent request handling.
A consulting firm builds a research agent that, given a company name, autonomously: (1) queries a web search tool, (2) scrapes and summarises relevant pages, (3) cross-references against an internal vector store of past reports, and (4) drafts a structured briefing. The graph uses a loop with a conditional edge that decides whether another search iteration is needed based on a confidence score stored in state. A Postgres checkpointer persists state so long-running research jobs survive server restarts.
An internal tool generates PR review comments with an LLM, then pauses at an interrupt_before=["post_comment"] node. A human reviewer sees the draft comments in a web UI, edits them, and hits "approve." The graph resumes from the checkpoint with the updated comments and posts them to GitHub via a tool node. Without LangGraph's interrupt mechanism this would require a custom state machine; with it, it is ~30 lines of graph definition.
A SaaS company routes incoming support tickets through a classifier node (LLM assigns category and urgency), then fan-outs to parallel specialist subgraphs (billing agent, technical agent, policy agent) using Send-based parallelism. Each subgraph drafts a response independently; a merge node selects the best response and queues it for human review if confidence is low. Checkpoints allow the workflow to be audited and replayed for quality assurance.
A data engineering team wraps diagnostic tools (run SQL query, fetch dbt test results, read Datadog metrics) in LangGraph tool nodes. When an alerting webhook fires, the graph autonomously runs a root-cause-analysis loop: hypothesise a cause → run a diagnostic query → evaluate result → loop or conclude. The final node writes a structured incident report to a Slack channel. The graph is deployed as a FastAPI endpoint behind the alerting webhook.
↑ Back to topfrom typing import Annotated, TypedDict
import operator
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
# --- State definition ---
class AgentState(TypedDict):
# operator.add reducer appends new messages instead of overwriting
messages: Annotated[list[BaseMessage], operator.add]
# --- Tool ---
@tool
def get_weather(city: str) -> str:
"""Return current weather for a city."""
return f"The weather in {city} is 22°C and sunny."
tools = [get_weather]
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
# --- Nodes ---
def agent_node(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
def should_use_tools(state: AgentState) -> str:
last = state["messages"][-1]
if getattr(last, "tool_calls", None):
return "tools"
return "end"
# --- Build graph ---
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_use_tools, {"tools": "tools", "end": END})
graph.add_edge("tools", "agent") # loop back after tool execution
app = graph.compile()
# --- Invoke ---
result = app.invoke({"messages": [HumanMessage(content="What's the weather in Tokyo?")]})
print(result["messages"][-1].content)
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
@tool
def search_docs(query: str) -> str:
"""Search internal documentation."""
return f"Documentation result for: {query}"
# SqliteSaver persists state to disk — survives process restarts
checkpointer = SqliteSaver.from_conn_string(":memory:") # or a file path
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=[search_docs],
checkpointer=checkpointer,
)
# thread_id scopes memory to a single conversation
config = {"configurable": {"thread_id": "user-42-session-1"}}
# Turn 1
r1 = agent.invoke(
{"messages": [{"role": "user", "content": "What is our refund policy?"}]},
config=config,
)
print(r1["messages"][-1].content)
# Turn 2 — agent remembers context from Turn 1 via checkpointer
r2 = agent.invoke(
{"messages": [{"role": "user", "content": "Does that apply to digital goods?"}]},
config=config,
)
print(r2["messages"][-1].content)
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
class ReviewState(TypedDict):
draft: str
approved: bool
final: str
def generate_draft(state: ReviewState) -> dict:
return {"draft": "Draft PR comment: consider extracting this into a helper function."}
def post_comment(state: ReviewState) -> dict:
# In production: call GitHub API here
print(f"Posting: {state['draft']}")
return {"final": state["draft"]}
graph = StateGraph(ReviewState)
graph.add_node("generate_draft", generate_draft)
graph.add_node("post_comment", post_comment)
graph.set_entry_point("generate_draft")
graph.add_edge("generate_draft", "post_comment")
graph.add_edge("post_comment", END)
# interrupt_before pauses execution BEFORE post_comment runs
app = graph.compile(checkpointer=MemorySaver(), interrupt_before=["post_comment"])
config = {"configurable": {"thread_id": "review-1"}}
# Step 1: run until the interrupt
app.invoke({"draft": "", "approved": False, "final": ""}, config=config)
# Step 2: human edits state and resumes
app.update_state(config, {"draft": "Edited: please extract this into utils.py"})
app.invoke(None, config=config) # None resumes from checkpoint
from typing import Annotated, TypedDict
import operator
from langgraph.graph import StateGraph, END
from langgraph.types import Send
class OverallState(TypedDict):
topics: list[str]
summaries: Annotated[list[str], operator.add]
class WorkerState(TypedDict):
topic: str
def fan_out(state: OverallState) -> list[Send]:
# Dispatch one worker node per topic — runs in parallel
return [Send("summarise_topic", {"topic": t}) for t in state["topics"]]
def summarise_topic(state: WorkerState) -> dict:
# In practice: call an LLM here
return {"summaries": [f"Summary of {state['topic']}"]}
def merge(state: OverallState) -> dict:
combined = "\n".join(state["summaries"])
print(f"Merged report:\n{combined}")
return {}
graph = StateGraph(OverallState)
graph.add_node("summarise_topic", summarise_topic)
graph.add_node("merge", merge)
graph.set_entry_point("fan_out_edge") # can't be a function; use conditional edge from START
from langgraph.graph import START
graph2 = StateGraph(OverallState)
graph2.add_node("summarise_topic", summarise_topic)
graph2.add_node("merge", merge)
graph2.add_conditional_edges(START, fan_out, ["summarise_topic"])
graph2.add_edge("summarise_topic", "merge")
graph2.add_edge("merge", END)
app2 = graph2.compile()
app2.invoke({"topics": ["climate change", "electric vehicles", "battery tech"], "summaries": []})
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import json
app_api = FastAPI()
agent = create_react_agent(ChatOpenAI(model="gpt-4o"), tools=[])
@app_api.post("/chat")
async def chat(body: dict):
async def generate():
async for event in agent.astream_events(
{"messages": [HumanMessage(content=body["message"])]},
version="v2",
):
# Filter for LLM token chunks only
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"].content
if chunk:
yield f"data: {json.dumps({'token': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
↑ Back to top
| Framework | Abstraction Level | State Management | Human-in-the-Loop | Parallelism | Best For |
|---|---|---|---|---|---|
| LangGraph | Low — explicit graph nodes/edges | Typed state dict + reducers + checkpointers | First-class (interrupt_before/after) |
Send-based fan-out, async nodes |
Production agents requiring fine-grained control, persistence, and complex loops |
| LangChain LCEL | Medium — chain composition with | |
Passed between runnables (no built-in persistence) | Manual | RunnableParallel |
Linear pipelines, RAG chains, structured output extraction |
| CrewAI | High — role-based agents and tasks | Task context passing; no native checkpointer | Limited | Async task execution | Rapid prototyping of multi-persona agent workflows |
| AutoGen | High — conversation-based multi-agent | Conversation history | Human proxy agent | Limited | Research prototypes, code generation via back-and-forth agents |
| Temporal / Prefect | Workflow orchestration (non-LLM native) | Workflow state with retries and schedules | Manual signals/approvals | Native parallel activities | Long-running business workflows where LLMs are one step among many |
operator.add reducer on a messages list means every turn appends messages. If you accidentally add a full state snapshot to messages instead of a single new message, your context window fills up fast. Always return only the delta from each node, not the full state.
MemorySaver in production.
MemorySaver stores state in a Python dict in-process. Any restart, crash, or pod scaling event wipes all thread histories. Use PostgresSaver (or SqliteSaver for single-instance deployments) for anything that needs to survive a process restart.
step_count field to your state and a conditional edge that routes to END when step_count > MAX_STEPS. create_react_agent does this automatically via recursion_limit; custom graphs must handle it explicitly.
ToolNode catches exceptions and puts them as ToolMessage content in the message list, but the LLM will often retry on error. Without explicit error-handling logic in your routing function (e.g., count consecutive errors and bail out), a broken tool causes an expensive retry loop.
messages: list = [] at class body level — TypedDict doesn't support default values like dataclasses do). Instead, always pass initial state explicitly when invoking the graph. Using dataclasses.field(default_factory=list) patterns from dataclasses do not apply to TypedDict.
generate_sql (LLM writes SQL), run_sql (executes against a SQLite DB), and evaluate (checks if the result is non-empty and plausible). Add a conditional edge from evaluate that loops back to generate_sql with the error message in state if the query fails, or routes to END on success. Cap the loop at 3 attempts using a retries counter in state.
SqliteSaver checkpointer backed by a file (e.g., weather_agent.db). Run the agent in a loop reading user input from stdin, passing thread_id="session-1" on every turn. Verify that asking "what about in Paris?" after a first question about Tokyo correctly references the prior context.
supervisor node (LLM classifies task as "research" or "calculation"). Use conditional edges to route to two StateGraph subgraphs: a research subgraph (web search tool) and a calculation subgraph (Python REPL tool). The subgraph results are written back into the parent state and a final respond node synthesises the answer. Test with prompts that exercise both branches.
A reducer is a function that determines how incoming values from a node are merged into the existing state for a given key. The default behaviour (no annotation) is replace — the new value overwrites the old one completely. Annotating a key with Annotated[list[X], operator.add] switches it to append, so each node's returned list is concatenated onto the existing list instead of replacing it. Choosing the wrong reducer is a common bug: using the default on a messages key means each node call erases prior messages.
Raising an exception aborts execution and discards in-flight state (unless you catch it externally). LangGraph's interrupt_before / interrupt_after mechanism cleanly suspends execution at a deterministic point and writes a checkpoint, preserving the full state. The graph can then be inspected, the state can be edited via update_state(), and execution resumes from exactly where it paused by calling invoke(None, config=config). This makes human review workflows reliable and auditable — the interrupted state can be replayed or rolled back.
Send is used when the number of parallel branches is dynamic — determined at runtime from state (e.g., fan-out over a variable-length list of topics). RunnableParallel has a fixed set of branches defined at graph-construction time. Additionally, Send dispatches each branch as a full node invocation within the LangGraph execution model, so each branch gets its own checkpointed state slot, whereas RunnableParallel runs runnables concurrently within a single LCEL step with no per-branch checkpointing.
"values" emits the complete state snapshot after every node completes — useful for a progress dashboard showing full state. "updates" emits only the partial dict returned by each node — more efficient for large states where most keys don't change. "messages" emits individual LLM token chunks (streaming text) as they are generated, interleaved with node-level events — essential for chat UIs that need word-by-word output. Use astream_events(version="v2") when you need fine-grained event filtering across all three levels simultaneously.
Use a persistent, shared checkpointer backed by an external store — PostgresSaver (pointing to a shared Postgres instance) or a Redis-backed checkpointer. All replicas connect to the same backend using the same thread_id, so any replica can load the full conversation history regardless of which replica handled prior turns. MemorySaver is in-process only and will not work across replicas. You must also ensure the thread_id is stable per user session — typically stored in a session cookie or JWT claim and passed in the config of every invocation.