Skip to content

How to Build an Agentic Data Pipeline

Use LangGraph to wire a supervisor-worker multi-agent pipeline with typed tools, shared Redis state, and LangSmith observability. The full stack takes about 3 hours to set up end-to-end.

Steps

1

Install dependencies and set up project structure

pip install langgraph langchain-openai langchain-core redis

# Project structure
agentic-pipeline/
  agents/      # supervisor.py, ingest.py, validate.py
  tools/       # db.py, api.py, schema.py
  state.py     # shared TypedDict state
  graph.py     # LangGraph wiring
2

Define typed agent tools

Each tool is a Python function decorated with @tool. The docstring becomes the tool description the LLM reads to decide when to call it.

from langchain_core.tools import tool

@tool
def run_query(sql: str) -> list[dict]:
    """Run a read-only SQL query. Returns rows as dicts."""
    with engine.connect() as conn:
        return [dict(r) for r in conn.execute(text(sql))]

@tool
def check_null_rate(table: str, column: str) -> float:
    """Return the null rate (0.0–1.0) for a column."""
    rows = run_query(f'SELECT COUNT(*) FILTER (WHERE {column} IS NULL), COUNT(*) FROM {table}')
    return rows[0]["count_filter"] / rows[0]["count"]
3

Define shared agent state

from typing import TypedDict, Annotated
from langgraph.graph import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    next_worker: str           # "ingest" | "validate" | "transform" | "FINISH"
    current_task: str          # human-readable current task
    error_count: int           # escalate after threshold
    results: list[dict]        # accumulated output rows
4

Build worker agents

Each worker binds an LLM to a set of tools, invokes it with the current state, and returns the result as a state update.

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def validate_agent(state: AgentState) -> AgentState:
    agent = llm.bind_tools([check_null_rate, run_query])
    result = agent.invoke(state["messages"])
    return {"messages": [result]}
5

Add a supervisor agent

The supervisor reads messages from all workers and sets next_worker to route the graph.

from langchain_core.prompts import ChatPromptTemplate

WORKERS = ["ingest", "validate", "transform", "FINISH"]

def supervisor_node(state: AgentState) -> AgentState:
    prompt = ChatPromptTemplate.from_messages([
        ("system", f'Route to one of: {WORKERS}. Reply with just the worker name.'  ),
        ("human", "Task: {task}. Last result: {last}"  ),
    ])
    response = llm.invoke(prompt.format_messages(...))
    return {"next_worker": response.content.strip()}
6

Wire the graph with Redis checkpointing

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis import RedisSaver

builder = StateGraph(AgentState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("ingest", ingest_agent)
builder.add_node("validate", validate_agent)
builder.add_node("transform", transform_agent)

builder.set_entry_point("supervisor")
builder.add_conditional_edges(
    "supervisor",
    lambda s: s["next_worker"],
    {"ingest": "ingest", "validate": "validate",
     "transform": "transform", "FINISH": END}
)

checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
app = builder.compile(checkpointer=checkpointer, recursion_limit=25)

When to Use This Pattern

  • Pipeline failures require reasoning to fix (not just retry)
  • Routing depends on data content discovered at runtime
  • You need human escalation only when the agent is truly stuck
  • Schema drift is expected and needs autonomous handling

Common Issues

Agent loops infinitely

Set recursion_limit in compile() and add an error_count guard in the supervisor routing function.

LLM picks wrong tool

Improve tool docstrings — the LLM uses them to select tools. Be explicit: "Use this to check null rates, NOT for row counts."

State not persisted after crash

Confirm Redis is running and the checkpointer is passed to compile(). Test by killing the process mid-run and resuming with the same thread_id.

Tool returns too much data

Add a limit parameter to all query tools. The full result set gets serialized into the agent context — large results break token limits.

FAQ

What is the minimum setup for a LangGraph agentic pipeline?
Python 3.10+, langgraph, langchain-openai, and redis. Define a TypedDict state, at least one tool, one agent node, and compile the graph. Redis checkpointing is optional locally but required in production.
How do I prevent an agent from looping forever?
Set recursion_limit when compiling: workflow.compile(recursion_limit=25). Also add an error_count field to state and return END from the supervisor if it exceeds a threshold.
How does Redis checkpointing work?
LangGraph serializes the full agent state to Redis after every node execution. Resume from any checkpoint by passing the same thread_id — no steps are re-executed.

Related

Press Cmd+K to open