How to Build an Agentic Data Pipeline
Use LangGraph to wire a supervisor-worker multi-agent pipeline with typed tools, shared Redis state, and LangSmith observability. The full stack takes about 3 hours to set up end-to-end.
Steps
Install dependencies and set up project structure
pip install langgraph langchain-openai langchain-core redis
# Project structure
agentic-pipeline/
agents/ # supervisor.py, ingest.py, validate.py
tools/ # db.py, api.py, schema.py
state.py # shared TypedDict state
graph.py # LangGraph wiring
Define typed agent tools
Each tool is a Python function decorated with @tool. The docstring becomes the tool description the LLM reads to decide when to call it.
from langchain_core.tools import tool
@tool
def run_query(sql: str) -> list[dict]:
"""Run a read-only SQL query. Returns rows as dicts."""
with engine.connect() as conn:
return [dict(r) for r in conn.execute(text(sql))]
@tool
def check_null_rate(table: str, column: str) -> float:
"""Return the null rate (0.0–1.0) for a column."""
rows = run_query(f'SELECT COUNT(*) FILTER (WHERE {column} IS NULL), COUNT(*) FROM {table}')
return rows[0]["count_filter"] / rows[0]["count"]
Define shared agent state
from typing import TypedDict, Annotated
from langgraph.graph import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
next_worker: str # "ingest" | "validate" | "transform" | "FINISH"
current_task: str # human-readable current task
error_count: int # escalate after threshold
results: list[dict] # accumulated output rows
Build worker agents
Each worker binds an LLM to a set of tools, invokes it with the current state, and returns the result as a state update.
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def validate_agent(state: AgentState) -> AgentState:
agent = llm.bind_tools([check_null_rate, run_query])
result = agent.invoke(state["messages"])
return {"messages": [result]}
Add a supervisor agent
The supervisor reads messages from all workers and sets next_worker to route the graph.
from langchain_core.prompts import ChatPromptTemplate
WORKERS = ["ingest", "validate", "transform", "FINISH"]
def supervisor_node(state: AgentState) -> AgentState:
prompt = ChatPromptTemplate.from_messages([
("system", f'Route to one of: {WORKERS}. Reply with just the worker name.' ),
("human", "Task: {task}. Last result: {last}" ),
])
response = llm.invoke(prompt.format_messages(...))
return {"next_worker": response.content.strip()}
Wire the graph with Redis checkpointing
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis import RedisSaver
builder = StateGraph(AgentState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("ingest", ingest_agent)
builder.add_node("validate", validate_agent)
builder.add_node("transform", transform_agent)
builder.set_entry_point("supervisor")
builder.add_conditional_edges(
"supervisor",
lambda s: s["next_worker"],
{"ingest": "ingest", "validate": "validate",
"transform": "transform", "FINISH": END}
)
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
app = builder.compile(checkpointer=checkpointer, recursion_limit=25)
When to Use This Pattern
- → Pipeline failures require reasoning to fix (not just retry)
- → Routing depends on data content discovered at runtime
- → You need human escalation only when the agent is truly stuck
- → Schema drift is expected and needs autonomous handling
Common Issues
Agent loops infinitely
Set recursion_limit in compile() and add an error_count guard in the supervisor routing function.
LLM picks wrong tool
Improve tool docstrings — the LLM uses them to select tools. Be explicit: "Use this to check null rates, NOT for row counts."
State not persisted after crash
Confirm Redis is running and the checkpointer is passed to compile(). Test by killing the process mid-run and resuming with the same thread_id.
Tool returns too much data
Add a limit parameter to all query tools. The full result set gets serialized into the agent context — large results break token limits.
FAQ
- What is the minimum setup for a LangGraph agentic pipeline?
- Python 3.10+, langgraph, langchain-openai, and redis. Define a TypedDict state, at least one tool, one agent node, and compile the graph. Redis checkpointing is optional locally but required in production.
- How do I prevent an agent from looping forever?
- Set recursion_limit when compiling: workflow.compile(recursion_limit=25). Also add an error_count field to state and return END from the supervisor if it exceeds a threshold.
- How does Redis checkpointing work?
- LangGraph serializes the full agent state to Redis after every node execution. Resume from any checkpoint by passing the same thread_id — no steps are re-executed.