background-shape
Production Multi Agent Systems with LangGraph 0.2, A Hands On Tutorial
March 5, 2025 · 10 min read · by Muhammad Amal programming

TL;DR — A production LangGraph deployment needs three things the docs gloss over, a Postgres checkpointer with proper migrations, per-node retry policies with deterministic IDs, and streaming wired through to your HTTP layer. Here’s how I do all three.

LangGraph 0.2 is the version I’d actually run in production. The 0.1 to 0.2 transition cleaned up the checkpointer interface, added first-class interrupt/Command primitives, and made the langgraph-checkpoint-postgres package stable. The default MemorySaver example you see in every tutorial is fine for a notebook and useless for anything real.

This is the build I do for clients when they ask for a multi-agent system that survives restarts, retries failed nodes without re-running successful ones, streams partial results to the UI, and supports human approvals at sensitive steps. I’ll build a research-and-draft agent system in this post. Same architecture I’ve shipped twice.

You’ll need Python 3.12, Postgres 16 or 17, and an OpenAI or Anthropic key. Code below is tested against langgraph==0.2.74, langgraph-checkpoint-postgres==2.0.13, psycopg[binary]==3.2.4, langchain-openai==0.2.14, and openai==1.59.4. If you’re picking topologies, my multi-agent architecture patterns post lays out the choices.

1. Setting up the project

Get the boring parts done first. We’ll set up the venv, Postgres, and the package list.

python3.12 -m venv .venv && source .venv/bin/activate
pip install \
  "langgraph==0.2.74" \
  "langgraph-checkpoint-postgres==2.0.13" \
  "psycopg[binary,pool]==3.2.4" \
  "langchain-openai==0.2.14" \
  "openai==1.59.4" \
  "tenacity==9.0.0" \
  "fastapi==0.115.6" \
  "uvicorn[standard]==0.34.0"

Spin up Postgres locally if you don’t have one. Docker is fine.

docker run -d --name lg-pg \
  -e POSTGRES_PASSWORD=lg -e POSTGRES_DB=langgraph \
  -p 5432:5432 postgres:17-alpine

Set the connection string in your environment.

export DATABASE_URL="postgresql://postgres:lg@localhost:5432/langgraph"
export OPENAI_API_KEY="sk-..."

2. State, nodes, and the graph

Define the state shape first. Be specific. Loose dict[str, Any] state is a debugging tax you pay later.

# graph.py
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage

class ResearchState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    topic: str
    research_notes: str
    draft: str
    review_feedback: str
    iteration: int
    approved: bool

Now the nodes. Each is a plain function. I keep nodes small and pure where possible, side effects belong in their own nodes so you can retry them independently.

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def research_node(state: ResearchState) -> dict:
    prompt = (
        f"Research the topic '{state['topic']}'. "
        "Return 6 to 10 bullet facts with source hints."
    )
    notes = llm.invoke([HumanMessage(content=prompt)]).content
    return {
        "research_notes": notes,
        "messages": [AIMessage(content=f"[research] {len(notes.split())} words of notes")]
    }

def draft_node(state: ResearchState) -> dict:
    sys = SystemMessage(content="You are a technical writer. Crisp, no fluff.")
    prompt = HumanMessage(content=(
        f"Topic: {state['topic']}\n\nNotes:\n{state['research_notes']}\n\n"
        f"Prior feedback (if any):\n{state.get('review_feedback', '')}\n\n"
        "Write a 400-word draft."
    ))
    draft = llm.invoke([sys, prompt]).content
    return {
        "draft": draft,
        "iteration": state.get("iteration", 0) + 1,
        "messages": [AIMessage(content=f"[draft v{state.get('iteration', 0) + 1}]")]
    }

def review_node(state: ResearchState) -> dict:
    prompt = HumanMessage(content=(
        f"Review this draft for accuracy and clarity. If it's good, respond "
        f"with 'APPROVED'. Otherwise give specific edits.\n\n{state['draft']}"
    ))
    fb = llm.invoke([prompt]).content
    return {
        "review_feedback": fb,
        "approved": "APPROVED" in fb.upper(),
        "messages": [AIMessage(content=f"[review] {'pass' if 'APPROVED' in fb.upper() else 'changes requested'}")]
    }

Wire them up. The conditional edge is the only place real routing happens.

from langgraph.graph import StateGraph, END

def route_after_review(state: ResearchState) -> str:
    if state["approved"]:
        return END
    if state["iteration"] >= 3:
        return END  # circuit breaker
    return "draft"

builder = StateGraph(ResearchState)
builder.add_node("research", research_node)
builder.add_node("draft", draft_node)
builder.add_node("review", review_node)
builder.set_entry_point("research")
builder.add_edge("research", "draft")
builder.add_edge("draft", "review")
builder.add_conditional_edges("review", route_after_review)

The hard limit of three iterations is a habit I’d recommend you copy. Cycles plus LLMs equals runaway bills.

3. Postgres checkpointer, the right way

This is the part the docs minimize. MemorySaver is useless once you have more than one process or any restart at all. PostgresSaver is the answer, but you need to handle migrations explicitly.

# checkpointer.py
import os
from contextlib import contextmanager
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver

DB_URL = os.environ["DATABASE_URL"]

pool = ConnectionPool(
    conninfo=DB_URL,
    max_size=20,
    kwargs={"autocommit": True, "prepare_threshold": 0},
)

def get_checkpointer() -> PostgresSaver:
    return PostgresSaver(pool)

def migrate_once() -> None:
    """Run the LangGraph checkpoint schema migrations. Idempotent."""
    saver = PostgresSaver(pool)
    saver.setup()

Run migrate_once() once at deploy time, not on every process start. The schema is small but the migrations include index creation that you don’t want racing across pods.

# bootstrap.py
from checkpointer import migrate_once
migrate_once()
print("checkpointer schema ready")

Now compile the graph with the checkpointer attached.

# app.py
from graph import builder
from checkpointer import get_checkpointer

graph = builder.compile(checkpointer=get_checkpointer())

A run is identified by a thread_id. Same thread_id resumes the same conversation, different thread_id is a fresh run. This is your durability primitive.

config = {"configurable": {"thread_id": "research-2025-03-05-001"}}
result = graph.invoke({
    "topic": "Postgres 17 streaming I/O",
    "messages": [],
    "research_notes": "",
    "draft": "",
    "review_feedback": "",
    "iteration": 0,
    "approved": False,
}, config=config)

Kill the process mid-run, restart, call graph.invoke({}, config=config) again, and it picks up from the last completed node. That’s the magic, but only if your checkpointer is real and your nodes are idempotent.

4. Retries with deterministic IDs

LangGraph 0.2 supports node-level retry policies via the RetryPolicy dataclass. I always set them explicitly per node, the defaults are too aggressive for paid API calls.

from langgraph.pregel.types import RetryPolicy

retry = RetryPolicy(
    max_attempts=3,
    initial_interval=2.0,
    backoff_factor=2.0,
    jitter=True,
    retry_on=(TimeoutError, ConnectionError),
)

builder.add_node("research", research_node, retry=retry)
builder.add_node("draft", draft_node, retry=retry)
builder.add_node("review", review_node, retry=retry)

The non-obvious part is that retry_on should NOT include broad exceptions like Exception. If your node code has a bug that raises KeyError, you want to fail fast, not retry three times and accumulate $3 in API charges first.

For the LLM call itself, pass an idempotency-style request ID so OpenAI or Anthropic dedupes if the same call lands twice in a network hiccup window. OpenAI’s SDK takes an extra_headers for this.

import uuid

def research_node(state: ResearchState) -> dict:
    req_id = f"research-{state['topic'][:20]}-{uuid.uuid4().hex[:8]}"
    notes = llm.invoke(
        [HumanMessage(content=f"Research: {state['topic']}")],
        config={"configurable": {"openai_api_key_request_id": req_id}}
    ).content
    # ...

OpenAI SDK 1.59+ honors Idempotency-Key headers natively on completions, see their idempotency docs.

5. Streaming partial results to the client

A multi-agent system that makes the user wait 30 seconds with no feedback feels broken even when it isn’t. LangGraph 0.2 supports four stream modes, updates, values, messages, and custom. I default to messages for token-level streaming and updates for node-completion events.

# server.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from app import graph
import json

api = FastAPI()

@api.post("/research")
async def research(payload: dict):
    config = {"configurable": {"thread_id": payload["thread_id"]}}
    initial = {
        "topic": payload["topic"],
        "messages": [],
        "research_notes": "",
        "draft": "",
        "review_feedback": "",
        "iteration": 0,
        "approved": False,
    }

    async def event_stream():
        async for mode, chunk in graph.astream(
            initial, config=config, stream_mode=["updates", "messages"]
        ):
            if mode == "updates":
                for node_name, node_update in chunk.items():
                    yield f"data: {json.dumps({'type': 'node', 'node': node_name})}\n\n"
            elif mode == "messages":
                msg, meta = chunk
                if hasattr(msg, "content") and msg.content:
                    yield f"data: {json.dumps({'type': 'token', 'text': msg.content})}\n\n"
        yield "data: {\"type\": \"done\"}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

Test with curl, you’ll see SSE events stream as nodes complete.

curl -N -X POST http://localhost:8000/research \
  -H "content-type: application/json" \
  -d '{"thread_id":"t1","topic":"vector indexes"}'

The boundary I always enforce, a frontend should only need thread_id and a prompt. Everything else, retry state, iteration count, intermediate drafts, lives server-side. Don’t expose checkpoint internals.

6. Human in the loop with interrupts

LangGraph 0.2 added the interrupt function as a first-class primitive. It pauses the graph mid-node, persists state, and waits for a Command(resume=value) to continue. This is how you do approval steps without hacking around the framework.

from langgraph.types import interrupt, Command

def approval_node(state: ResearchState) -> dict:
    decision = interrupt({
        "draft": state["draft"],
        "iteration": state["iteration"],
        "prompt": "Approve, request edits, or reject?",
    })
    return {
        "approved": decision == "approve",
        "review_feedback": decision if decision != "approve" else "",
    }

builder.add_node("approval", approval_node)
builder.add_edge("draft", "approval")
builder.add_conditional_edges("approval", lambda s: END if s["approved"] else "draft")

The first invoke pauses at approval. The graph state is now persisted with status interrupted. Whenever your reviewer clicks a button, you resume.

# resume side
graph.invoke(Command(resume="approve"), config={"configurable": {"thread_id": "t1"}})

This is the cleanest way I’ve found to do human approvals. No polling, no separate state machine, durability comes free with the checkpointer.

7. Subgraphs and reusable pieces

LangGraph 0.2 treats a compiled graph as a callable, so you can use one as a node inside another. This is the building block for hierarchical patterns and for shipping reusable agent components across projects.

# a research subgraph
research_builder = StateGraph(ResearchState)
research_builder.add_node("research", research_node)
research_builder.add_node("draft", draft_node)
research_builder.set_entry_point("research")
research_builder.add_edge("research", "draft")
research_builder.add_edge("draft", END)
research_subgraph = research_builder.compile()

# use it inside a bigger graph
outer = StateGraph(OuterState)
outer.add_node("research_pipeline", research_subgraph)
outer.add_node("approval", approval_node)
outer.set_entry_point("research_pipeline")
outer.add_edge("research_pipeline", "approval")

The catch is state shape. The subgraph expects ResearchState, the outer graph has OuterState. LangGraph maps the matching keys automatically and ignores extras. If your subgraph needs a field that isn’t in the outer state, pass it via the input argument when defining the node, or transform with a wrapper function. I prefer wrapper functions because they make the contract explicit.

8. Versioning graphs in production

The bug I hit on my second deploy was a graph schema change that left in-flight runs unable to resume. Old checkpoints expected the old schema, new code expected the new one, the saver loaded happily and crashed when it hit a missing field.

The discipline that worked, version the graph in the thread ID. When I make a breaking change, the new graph compiles under a new version, and the API includes the version in the thread ID it issues. Old in-flight runs finish on the old graph, new runs use the new one. After the longest-running in-flight workflow’s deadline passes, I drop the old graph code.

GRAPH_VERSION = "v3"  # bump on schema breaking changes

def make_thread_id(user_id: str) -> str:
    return f"{GRAPH_VERSION}:{user_id}:{uuid.uuid4()}"

This is mundane plumbing that saves a class of incident. Add it from day one.

Common Pitfalls

I’ve hit every one of these on a real deploy.

  1. Forgetting to call saver.setup() in CI. The schema migrations fail silently if you only run them in dev. Add migrate_once() to your deploy pipeline as a separate step, gated before app rollout.
  2. Pickling non-serializable state. Postgres checkpointer uses msgpack via ormsgpack. If you stash a requests.Session or a boto3 client in state, it’ll explode at the first checkpoint. Keep state to JSON-safe types or use langgraph’s Send API to pass objects out of band.
  3. Concurrent writes to the same thread_id. Two requests with the same thread ID will race and you’ll see “checkpoint conflict” errors. Lock at the API layer with a Redis lock or use unique thread IDs per request. I generate f"{user_id}:{uuid4()}" and let the user resume by passing it back.
  4. Streaming and retries together. A retried node will re-emit its message tokens, so the client sees duplicates. Wrap your SSE handler to dedupe by node name + sequence, or only stream from non-retryable display nodes.

Troubleshooting

Three things you’ll actually see.

“connection pool exhausted” on burst traffic. Your psycopg_pool.ConnectionPool size is too small or you’re holding connections too long. Bump max_size to 2x your worker count and set timeout=5.0. Also confirm autocommit is on, long-held transactions are the usual culprit.

checkpoint with thread_id X already at step Y after a deploy. You changed the graph structure between deploys, and an in-flight run can’t replay. The fix is to version your graphs. I append a _v2 to the thread ID when I make breaking changes, old runs finish on the old graph, new runs go to the new one.

Tokens stream but UI shows nothing. Nine times out of ten the buffer between FastAPI and the client. Add X-Accel-Buffering: no if you’re behind nginx, and make sure your proxy isn’t gzip-buffering SSE. The other one time, you forgot stream=True somewhere down the LLM chain.

Wrapping Up

LangGraph 0.2 with PostgresSaver is the closest thing the ecosystem has to a “boring” foundation for multi-agent work. The graph DSL stays readable, the checkpointer gives you durability for free once you wire it correctly, and interrupts plus streaming cover the production UX requirements without leaving the framework.

The recipe I’d give you again, define state with TypedDicts, keep nodes small and idempotent, run migrations explicitly, attach per-node retries with narrow retry_on, and stream updates plus messages to your client. If you do those five things, the rest is application logic.

For the canonical reference, the LangGraph 0.2 release notes document every checkpointer and interrupt change since the 0.1 line. Worth a read before you upgrade an existing system.