Skip to content

LangGraph Adapter

The LangGraph adapter wraps LangGraph state graphs for deployment through FastAgentic. Deploy complex stateful workflows with full durability and streaming.

TL;DR

Wrap any compiled LangGraph StateGraph and get REST + MCP + node-level streaming + checkpoint durability.

Before FastAgentic

from fastapi import FastAPI, Depends
from fastapi.security import HTTPBearer
from sse_starlette import EventSourceResponse
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import json

app = FastAPI()
security = HTTPBearer()

# Define graph
class AgentState(TypedDict):
    messages: list
    next_step: str

workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("write", write_node)
workflow.add_edge("research", "write")
workflow.add_edge("write", END)
workflow.set_entry_point("research")

checkpointer = MemorySaver()  # Not durable!
graph = workflow.compile(checkpointer=checkpointer)

@app.post("/workflow")
async def run_workflow(input: dict, user = Depends(verify_token)):
    config = {"configurable": {"thread_id": str(uuid4())}}
    result = await graph.ainvoke(input, config)
    return result

@app.post("/workflow/stream")
async def stream_workflow(input: dict, user = Depends(verify_token)):
    async def generate():
        config = {"configurable": {"thread_id": str(uuid4())}}
        async for event in graph.astream_events(input, config, version="v2"):
            yield {"event": event["event"], "data": json.dumps(event["data"])}
    return EventSourceResponse(generate())

# Missing: durable checkpoints, resume, MCP, auth, cost tracking, etc.

After FastAgentic

from fastagentic import App, agent_endpoint
from fastagentic.adapters.langgraph import LangGraphAdapter
from langgraph.graph import StateGraph, END
from typing import TypedDict

class AgentState(TypedDict):
    messages: list
    next_step: str

workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("write", write_node)
workflow.add_edge("research", "write")
workflow.add_edge("write", END)
workflow.set_entry_point("research")

graph = workflow.compile()

app = App(
    title="Research Workflow",
    oidc_issuer="https://auth.company.com",
    durable_store="postgres://...",
)

@agent_endpoint(
    path="/workflow",
    runnable=LangGraphAdapter(graph),
    input_model=WorkflowInput,
    output_model=WorkflowOutput,
    stream=True,
    durable=True,
)
async def run_workflow(input: WorkflowInput) -> WorkflowOutput:
    pass

What You Get

Automatic Endpoints

Endpoint Method Description
POST /workflow Run graph synchronously
POST /workflow/stream Run with node-level streaming
GET /workflow/{run_id} Get run status and result
GET /workflow/{run_id}/events Replay event stream
POST /workflow/{run_id}/resume Resume from checkpoint
GET /workflow/{run_id}/state Get current graph state

LangGraph-Specific Features

Node-Level Streaming

Events emitted for each node:

node_start: {node: "research", state: {...}}
token: {content: "Researching..."}
tool_call: {tool: "search", args: {...}}
tool_result: {tool: "search", output: [...]}
node_end: {node: "research", result: {...}}
checkpoint: {checkpoint_id: "...", node: "research"}
node_start: {node: "write", state: {...}}
...

State Checkpointing

Every node transition creates a durable checkpoint:

# Resume from last checkpoint after failure
curl -X POST /workflow/run-123/resume

# Or resume from specific checkpoint
curl -X POST /workflow/run-123/resume?checkpoint=chk-456

Human-in-the-Loop

Use LangGraph interrupts with FastAgentic:

from langgraph.graph import StateGraph
from langgraph.prebuilt import ToolNode

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))
workflow.add_node("human", human_approval_node)

# Interrupt before human node
workflow.set_interrupt_before(["human"])

When interrupt is hit: 1. Checkpoint is saved 2. interrupt event emitted 3. Run pauses, waiting for resume 4. Call /workflow/{run_id}/resume with approval

Conditional Branching

def route(state):
    if state["needs_review"]:
        return "review"
    return "publish"

workflow.add_conditional_edges("analyze", route, {
    "review": "human_review",
    "publish": "auto_publish"
})

FastAgentic streams the decision:

node_end: {node: "analyze", result: {...}}
edge: {from: "analyze", to: "human_review", condition: "needs_review=True"}
node_start: {node: "human_review", ...}

Configuration Options

LangGraphAdapter Constructor

LangGraphAdapter(
    graph: CompiledStateGraph,
    state_schema: type[BaseModel] | None = None,
    checkpoint_every_node: bool = True,
    include_state_in_events: bool = False,
    interrupt_handler: Callable | None = None,
)
Parameter Type Default Description
graph CompiledStateGraph required Compiled LangGraph
state_schema type[BaseModel] None Pydantic model for state validation
checkpoint_every_node bool True Checkpoint after each node
include_state_in_events bool False Include full state in events
interrupt_handler Callable None Custom interrupt handling

Event Types

Event Description Payload
node_start Node begins {node, state, timestamp}
node_end Node completes {node, result, duration}
edge Transition occurs {from, to, condition}
token LLM output {content, node}
tool_call Tool invoked {tool, args, node}
tool_result Tool returns {tool, output, node}
checkpoint State saved {checkpoint_id, node}
interrupt Human input needed {node, state, reason}

Checkpoint State

The adapter persists: - Full graph state at each node - Node execution history - Tool call results - Token and cost counters - Interrupt status

Migration Guide

From LangGraph with MemorySaver

Replace:

from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

With:

# No checkpointer needed - FastAgentic handles it
graph = workflow.compile()
adapter = LangGraphAdapter(graph)

From LangGraph with PostgresSaver

Replace:

from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(conn_string)
graph = workflow.compile(checkpointer=checkpointer)

With:

# FastAgentic manages the connection
graph = workflow.compile()
app = App(..., durable_store="postgres://...")

@agent_endpoint(path="/workflow", runnable=LangGraphAdapter(graph), durable=True)

Common Patterns

Multi-Step Research

class ResearchState(TypedDict):
    query: str
    sources: list[str]
    findings: list[dict]
    report: str

workflow = StateGraph(ResearchState)
workflow.add_node("search", search_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("synthesize", synthesize_node)
workflow.add_edge("search", "analyze")
workflow.add_edge("analyze", "synthesize")
workflow.add_edge("synthesize", END)

@agent_endpoint(
    path="/research",
    runnable=LangGraphAdapter(workflow.compile()),
    stream=True,
    durable=True,
)
async def research(query: str) -> ResearchReport:
    pass

Approval Workflows

workflow.add_node("draft", draft_node)
workflow.add_node("review", human_review_node)
workflow.add_node("publish", publish_node)

workflow.set_interrupt_before(["review"])

@agent_endpoint(
    path="/content",
    runnable=LangGraphAdapter(workflow.compile()),
    durable=True,
)
async def create_content(topic: str) -> Content:
    pass

Usage:

# Start workflow
curl -X POST /content -d '{"topic": "AI trends"}'
# Returns: {run_id: "run-123", status: "interrupted", node: "review"}

# Approve and continue
curl -X POST /content/run-123/resume -d '{"approved": true}'

Next Steps