Skip to content

StreamCart Airflow Project

Step-by-step walkthrough: Build an e-commerce data pipeline with Apache Airflow

~2 hours
Python, Docker
Beginner Friendly

What You'll Build

Two production-ready Airflow DAGs for StreamCart, an e-commerce platform:

1. Web Events Ingestion

Ingest clickstream data every 15 minutes from web analytics API

2. Daily Product Summary

Aggregate product metrics daily for business reporting

Prerequisites

  • ✓ Docker Desktop installed and running
  • ✓ Basic Python knowledge (variables, functions, loops)
  • ✓ Terminal/command line familiarity
  • ✓ 8GB+ RAM available
1

Set Up Airflow Environment

~20 min

We'll use Docker Compose to run Airflow locally with all necessary components.

1.1 Create Project Directory
mkdir streamcart-airflow && cd streamcart-airflow
mkdir -p dags logs plugins data
1.2 Download Docker Compose File
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.0/docker-compose.yaml'

ℹ️ This downloads the official Airflow docker-compose configuration

1.3 Set Environment Variables
echo -e "AIRFLOW_UID=$(id -u)" > .env
1.4 Initialize Airflow Database
docker-compose up airflow-init

Expected Output:

airflow-init_1 exited with code 0

1.5 Start Airflow
docker-compose up -d

⏱️ This takes 2-3 minutes to start all services

1.6 Verify Airflow is Running

Open browser to: http://localhost:8080

Login: airflow / airflow

What You Should See:

Airflow UI with "DAGs" tab showing example DAGs

⚠️ Common Issues

  • • Port 8080 already in use → Change port in docker-compose.yaml
  • • "Permission denied" → Run: chmod -R 777 logs dags plugins
  • • Container fails to start → Ensure Docker has 4GB+ RAM allocated
2

Build Web Events Ingestion DAG

~30 min

Create a DAG that ingests clickstream data every 15 minutes.

2.1 Create DAG File
touch dags/ingest_web_events.py
2.2 Add Imports and DAG Definition
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator import json import random default_args = { 'owner': 'streamcart', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'ingest_web_events', default_args=default_args, description='Ingest web clickstream data every 15 minutes', schedule='*/15 * * * *', # Every 15 minutes start_date=datetime(2024, 1, 1), catchup=False, tags=['streamcart', 'ingestion', 'realtime'], )
2.3 Add Task Functions
def fetch_events(**context): """Simulate fetching events from web analytics API""" events = [] for i in range(random.randint(10, 50)): event = { 'event_id': f'evt_{i}_{datetime.now().timestamp()}', 'user_id': f'user_{random.randint(1, 1000)}', 'event_type': random.choice(['page_view', 'add_to_cart', 'purchase']), 'product_id': f'prod_{random.randint(1, 100)}', 'timestamp': datetime.now().isoformat() } events.append(event) # Save to temp file with open('/tmp/web_events.json', 'w') as f: json.dump(events, f) print(f"Fetched {len(events)} events") return len(events) def validate_events(**context): """Validate fetched events""" with open('/tmp/web_events.json', 'r') as f: events = json.load(f) assert len(events) > 0, "No events fetched" assert all('event_id' in e for e in events), "Missing event_id" print(f"Validated {len(events)} events") return True def load_to_warehouse(**context): """Load events to data warehouse (simulated)""" with open('/tmp/web_events.json', 'r') as f: events = json.load(f) # Simulate loading to warehouse print(f"Loading {len(events)} events to warehouse...") # In real scenario: INSERT INTO events_table VALUES ... print(f"✓ Loaded {len(events)} events successfully") return len(events)
2.4 Define Task Dependencies
# Create tasks fetch_task = PythonOperator( task_id='fetch_web_events', python_callable=fetch_events, dag=dag, ) validate_task = PythonOperator( task_id='validate_events', python_callable=validate_events, dag=dag, ) load_task = PythonOperator( task_id='load_to_warehouse', python_callable=load_to_warehouse, dag=dag, ) cleanup_task = BashOperator( task_id='cleanup_temp_files', bash_command='rm -f /tmp/web_events.json', dag=dag, ) # Set dependencies fetch_task >> validate_task >> load_task >> cleanup_task

💡 What This DAG Does

  1. Fetches web events from API (simulated)
  2. Validates events have required fields
  3. Loads events to data warehouse
  4. Cleans up temporary files
3

Build Daily Product Summary DAG

~25 min

Create a DAG that aggregates product metrics daily at 2 AM.

3.1 Create Second DAG File
touch dags/daily_product_summary.py
3.2 Complete DAG Code
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator import random default_args = { 'owner': 'streamcart', 'depends_on_past': False, 'email_on_failure': True, 'email': ['data-team@streamcart.com'], 'retries': 1, 'retry_delay': timedelta(minutes=10), } dag = DAG( 'daily_product_summary', default_args=default_args, description='Daily aggregation of product metrics', schedule='0 2 * * *', # Daily at 2 AM start_date=datetime(2024, 1, 1), catchup=False, tags=['streamcart', 'analytics', 'daily'], ) def extract_daily_events(**context): """Extract yesterday's events""" execution_date = context['execution_date'] print(f"Extracting events for {execution_date.date()}") # Simulate extraction return {'events_count': random.randint(1000, 5000)} def calculate_metrics(**context): """Calculate product metrics""" ti = context['ti'] data = ti.xcom_pull(task_ids='extract_events') metrics = { 'total_views': data['events_count'], 'total_purchases': int(data['events_count'] * 0.05), 'conversion_rate': 0.05, 'top_products': [f'prod_{i}' for i in range(1, 11)] } print(f"Calculated metrics: {metrics}") return metrics def generate_report(**context): """Generate business report""" ti = context['ti'] metrics = ti.xcom_pull(task_ids='calculate_metrics') report = f""" Daily Product Summary ==================== Date: {context['execution_date'].date()} Total Views: {metrics['total_views']:,} Total Purchases: {metrics['total_purchases']:,} Conversion Rate: {metrics['conversion_rate']:.1%} Top Products: {', '.join(metrics['top_products'][:5])} """ print(report) return report # Create tasks extract = PythonOperator( task_id='extract_events', python_callable=extract_daily_events, dag=dag, ) calculate = PythonOperator( task_id='calculate_metrics', python_callable=calculate_metrics, dag=dag, ) report = PythonOperator( task_id='generate_report', python_callable=generate_report, dag=dag, ) extract >> calculate >> report

✨ Key Features

  • • Uses XCom to pass data between tasks
  • • Scheduled for daily execution at 2 AM
  • • Email alerts on failure
  • • Automatic retry with backoff
4

Test Your DAGs

~15 min
4.1 Refresh Airflow UI

Go to http://localhost:8080

What You Should See:

Two new DAGs: "ingest_web_events" and "daily_product_summary"

4.2 Enable DAGs

Click the toggle switch next to each DAG name to enable them

4.3 Trigger Manual Run
  1. Click on "ingest_web_events" DAG name
  2. Click the "Play" button (▶) in top right
  3. Click "Trigger DAG"
  4. Watch the Graph view as tasks execute

Success!

All tasks should turn green (success) within 1-2 minutes

4.4 View Task Logs
  1. Click on any green task box
  2. Click "Log" button
  3. Review the output to see what happened

💡 Logs show print statements from your Python functions

4.5 Test Second DAG

Repeat steps 4.3-4.4 for "daily_product_summary" DAG

Congratulations! 🎉

You've successfully built and tested two production-ready Airflow DAGs!

What's Next?

Part 2: Advanced Scheduling

Learn cron expressions, dynamic DAGs, and branching logic

Part 3: Monitoring & Alerting

Set up SLAs, email alerts, and Slack notifications

Part 4: Production Deployment

Deploy to Kubernetes with proper secrets management

Press Cmd+K to open