Agentic Data Engineering Project
Step-by-Step Walkthrough: Build an AI Agent for Data Pipelines
Total Time: ~90 minutes
Difficulty: Advanced
Tools: LangChain, OpenAI, Python
What You'll Build
In this walkthrough, you'll build an agentic data engineering system that uses LLMs to automate data pipeline tasks intelligently:
- Create a ReAct agent that plans and executes data tasks
- Build custom tools for data validation and transformation
- Implement state management for multi-step workflows
- Handle errors and retry logic automatically
- Test agent decision-making and tool usage
Prerequisites
Python 3.9+ installed
OpenAI API key
Understanding of LLM agents and ReAct pattern
Familiarity with data engineering concepts
1
Set Up Agentic Framework
25 min1.1 Create Project Structure
# Create project directory
mkdir agentic-de
cd agentic-de
# Create subdirectories
mkdir -p src/agents src/tools src/state data logs
1.2 Install Agent Dependencies
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install packages
pip install langchain==0.1.0 openai==1.6.1 \
pandas==2.1.3 sqlalchemy==2.0.23 \
python-dotenv==1.0.0 pydantic==2.5.3
# Save requirements
pip freeze > requirements.txt
1.3 Configure Environment
# Create .env file
cat > .env <<EOF
OPENAI_API_KEY=your-api-key-here
AGENT_MODEL=gpt-4
AGENT_TEMPERATURE=0
MAX_ITERATIONS=10
EOF
GPT-4 for Agents
GPT-4 is recommended for agentic tasks due to better reasoning and tool-use capabilities. Use temperature=0 for deterministic behavior.
1.4 Create State Manager
Build a state manager to track agent execution:
# src/state/pipeline_state.py
from typing import Dict, List, Any
from pydantic import BaseModel
from datetime import datetime
class PipelineState(BaseModel):
"""Track agent execution state"""
task_id: str
status: str = "pending" # pending, running, completed, failed
steps_executed: List[Dict[str, Any]] = []
current_data: Dict[str, Any] = {}
errors: List[str] = []
started_at: datetime = None
completed_at: datetime = None
def add_step(self, tool_name: str, input_data: Any, output_data: Any):
"""Record a completed step"""
self.steps_executed.append({
"tool": tool_name,
"input": input_data,
"output": output_data,
"timestamp": datetime.now().isoformat()
})
def mark_complete(self):
self.status = "completed"
self.completed_at = datetime.now()
1.5 Test State Manager
# Test state manager
from src.state.pipeline_state import PipelineState
state = PipelineState(task_id="test-001")
state.add_step("validate_data", {"rows": 100}, {"valid": True})
state.mark_complete()
print(state.model_dump_json(indent=2))
Expected Output
{
"task_id": "test-001",
"status": "completed",
"steps_executed": [...],
...
}
"task_id": "test-001",
"status": "completed",
"steps_executed": [...],
...
}
2
Build Agent with Custom Tools
40 min2.1 Create Data Validation Tool
# src/tools/validate_data.py
from langchain.tools import BaseTool
from pydantic import Field
import pandas as pd
class ValidateDataTool(BaseTool):
name = "validate_data"
description = """
Validates a CSV file for data quality issues.
Input: path to CSV file
Output: validation report with null counts and duplicates
"""
def _run(self, file_path: str) -> str:
"""Execute validation"""
try:
df = pd.read_csv(file_path)
# Count nulls
null_counts = df.isnull().sum()
total_nulls = null_counts.sum()
# Find duplicates
duplicates = df.duplicated().sum()
report = f"""
Validation Report:
- Total rows: {len(df)}
- Total columns: {len(df.columns)}
- Null values: {total_nulls}
- Duplicate rows: {duplicates}
- Data quality: {'PASS' if total_nulls == 0 and duplicates == 0 else 'FAIL'}
"""
return report
except Exception as e:
return f"Error validating data: {str(e)}"
async def _arun(self, file_path: str) -> str:
return self._run(file_path)
Tool Design
LangChain tools need: (1) descriptive name, (2) clear description for the LLM, (3) _run method with the logic, (4) async _arun method. The description guides the agent on when to use this tool.
2.2 Create Data Transform Tool
# src/tools/transform_data.py
class TransformDataTool(BaseTool):
name = "transform_data"
description = """
Transforms CSV data: removes nulls, drops duplicates.
Input: source_path,output_path
Output: transformation summary
"""
def _run(self, source_path: str, output_path: str) -> str:
try:
df = pd.read_csv(source_path)
original_rows = len(df)
# Clean data
df = df.dropna()
df = df.drop_duplicates()
# Save
df.to_csv(output_path, index=False)
removed = original_rows - len(df)
return f"Transformed {original_rows} -> {len(df)} rows (removed {removed}). Saved to {output_path}"
except Exception as e:
return f"Error transforming: {str(e)}"
async def _arun(self, source_path: str, output_path: str):
return self._run(source_path, output_path)
2.3 Create the Agent
# src/agents/data_agent.py
from langchain.agents import initialize_agent, AgentType
from langchain.chat_models import ChatOpenAI
from src.tools.validate_data import ValidateDataTool
from src.tools.transform_data import TransformDataTool
import os
def create_data_agent():
"""Create agent with data tools"""
llm = ChatOpenAI(
model=os.getenv("AGENT_MODEL"),
temperature=0
)
tools = [ValidateDataTool(), TransformDataTool()]
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=10
)
return agent
ReAct Agent
ZERO_SHOT_REACT_DESCRIPTION uses Reasoning + Acting pattern: the agent thinks about what to do, acts (calls tools), observes results, and repeats until task is complete.
3
Test Agent Execution
25 min3.1 Create Test Data
# Create sample CSV with issues
import pandas as pd
data = {
'id': [1, 2, 3, 3, 4], # Duplicate id=3
'name': ['Alice', 'Bob', 'Charlie', 'Charlie', None], # Null
'amount': [100, 200, 300, 300, 400]
}
df = pd.DataFrame(data)
df.to_csv('data/raw_data.csv', index=False)
print("Test data created with 1 duplicate and 1 null")
3.2 Run Agent with Task
# Test agent
agent = create_data_agent()
task = """
I have a CSV file at 'data/raw_data.csv' that needs cleaning.
Please:
1. Validate the data first
2. If there are quality issues, transform it
3. Save cleaned data to 'data/clean_data.csv'
"""
result = agent.run(task)
print("\nAgent Result:")
print(result)
Expected Behavior
The agent should:
1. Call validate_data tool first
2. Observe quality issues (1 null, 1 duplicate)
3. Call transform_data tool to clean
4. Report success: 5 → 3 rows cleaned
1. Call validate_data tool first
2. Observe quality issues (1 null, 1 duplicate)
3. Call transform_data tool to clean
4. Report success: 5 → 3 rows cleaned
3.3 Verify Results
# Check cleaned data
clean_df = pd.read_csv('data/clean_data.csv')
print(f"\nCleaned data: {len(clean_df)} rows")
print(clean_df)
# Verify no issues
assert clean_df.isnull().sum().sum() == 0, "Still has nulls!"
assert clean_df.duplicated().sum() == 0, "Still has duplicates!"
print("\n✓ All quality checks passed!")
Agent Benefits
The agent automatically chose the right sequence of tools based on the task description. No hard-coded workflow needed - it reasoned about what to do!
Troubleshooting
- • Agent loops: Increase max_iterations or improve tool descriptions
- • Wrong tool selection: Make tool descriptions more specific
- • API rate limits: Add retry logic and exponential backoff
See the Agentic AI Troubleshooting Guide for more solutions.
Walkthrough Complete!
You've built an agentic data engineering system with ReAct agents, custom tools, and autonomous decision-making. You're ready for Part 2!
What You've Learned:
ReAct agent architecture
Building custom LangChain tools
State management for workflows
Data validation and transformation tools
Autonomous task planning and execution
Agent debugging with verbose mode
Error handling in agentic systems
Production-ready agentic workflows