Skip to content

ShopStream Spark Project

Step-by-Step Walkthrough: Build a Production Spark Data Pipeline

Total Time: ~90 minutes
Difficulty: Intermediate
Language: PySpark

What You'll Build

In this walkthrough, you'll build a production-grade Spark pipeline for ShopStream, an e-commerce platform processing millions of transactions daily. You'll learn to:

  • Set up Apache Spark locally with Docker
  • Ingest data from multiple formats (CSV, JSON, Parquet)
  • Transform data using DataFrame operations
  • Perform product analytics and customer segmentation
  • Write optimized Parquet output with partitioning

Prerequisites

Docker Desktop installed and running
Python 3.8+ installed
Basic understanding of DataFrames and SQL
Text editor or IDE (VS Code recommended)
1

Set Up Spark Environment

20 min

1.1 Create Project Directory

Create a directory structure for your Spark project:

# Create project directory
mkdir -p shopstream-spark
cd shopstream-spark
# Create subdirectories
mkdir -p data/raw data/processed jobs notebooks

1.2 Create docker-compose.yml

Create a Docker Compose file to run Spark with Jupyter:

# docker-compose.yml
version: '3.8'
services:
spark:
image: jupyter/pyspark-notebook:spark-3.5.0
container_name: shopstream-spark
ports:
- "8888:8888" # Jupyter
- "4040:4040" # Spark UI
volumes:
- ./data:/home/jovyan/data
- ./jobs:/home/jovyan/jobs
- ./notebooks:/home/jovyan/notebooks
environment:
- JUPYTER_ENABLE_LAB=yes
- GRANT_SUDO=yes

1.3 Download Sample Data

Create sample e-commerce data files:

# Create products.csv
cat > data/raw/products.csv <<EOF
product_id,name,category,price,stock
1,Laptop Pro,Electronics,1299.99,45
2,Wireless Mouse,Electronics,29.99,200
3,Desk Chair,Furniture,199.99,30
4,Coffee Maker,Appliances,79.99,150
5,Running Shoes,Apparel,89.99,75
EOF
# Create orders.json
cat > data/raw/orders.json <<EOF
{"order_id":1001,"customer_id":501,"product_id":1,"quantity":1,"order_date":"2024-01-15","total":1299.99}
{"order_id":1002,"customer_id":502,"product_id":2,"quantity":2,"order_date":"2024-01-15","total":59.98}
{"order_id":1003,"customer_id":501,"product_id":3,"quantity":1,"order_date":"2024-01-16","total":199.99}
EOF

1.4 Start Spark Environment

# Start Spark container
docker-compose up -d
# Check container is running
docker ps
What You Should See
CONTAINER ID IMAGE STATUS
abc123def456 jupyter/pyspark-notebook:spark-3.5.0 Up 10 seconds

1.5 Access Jupyter Notebook

Get the Jupyter token and access the notebook:

# Get Jupyter token
docker logs shopstream-spark 2>&1 | grep token

Open your browser to: http://localhost:8888

What This Does
Jupyter provides an interactive interface for writing and testing Spark code. The Spark UI (port 4040) lets you monitor job execution and performance.

1.6 Verify Spark Installation

Create a new notebook and test Spark:

from
pyspark.sql
import
SparkSession
# Create Spark session
spark = SparkSession.builder \
.appName(
"ShopStream"
) \
.getOrCreate()
# Test Spark
print(f
"Spark version: {spark.version}"
)
print(f
"Spark UI: {spark.sparkContext.uiWebUrl}"
)
Expected Output
Spark version: 3.5.0
Spark UI: http://localhost:4040
Common Issues
  • • If container fails to start, ensure Docker has at least 4GB RAM allocated
  • • If port 8888 is busy, change port mapping in docker-compose.yml
  • • On Windows, use PowerShell or WSL2 for commands
2

Build Multi-Format Data Ingestion

30 min

2.1 Read CSV Data (Products)

Create a Spark job to read product data:

# Read products from CSV
products_df = spark.read \
.option(
"header"
,
"true"
) \
.option(
"inferSchema"
,
"true"
) \
.csv(
"data/raw/products.csv"
)
# Display schema and data
products_df.printSchema()
products_df.show(
5
)
Expected Output
root
 |-- product_id: integer
 |-- name: string
 |-- category: string
 |-- price: double
 |-- stock: integer

2.2 Read JSON Data (Orders)

# Read orders from JSON
orders_df = spark.read \
.option(
"multiLine"
,
"false"
) \
.json(
"data/raw/orders.json"
)
# Display data
orders_df.show()
print(f
"Total orders: {orders_df.count()}"
)

2.3 Data Quality Validation

Add validation checks before processing:

from
pyspark.sql.functions
import
col, isnan, when, count
# Check for null values
products_df.select([
count(when(col(c).isNull(), c)).alias(c)
for
c
in
products_df.columns
]).show()
# Filter out invalid records
valid_products = products_df.filter(
(col(
"price"
) >
0
) &
(col(
"stock"
) >=
0
)
)
Best Practice
Always validate data quality early in your pipeline. Use inferSchema during development, but define explicit schemas for production to avoid type inference overhead.
3

Create DataFrame Transformations

25 min

3.1 Join Orders with Products

# Join orders with product details
enriched_orders = orders_df.join(
products_df,
orders_df.product_id == products_df.product_id,
"left"
).select(
orders_df[
"*"
],
products_df.name.alias(
"product_name"
),
products_df.category,
products_df.price
)
enriched_orders.show()

3.2 Product Analytics Aggregation

Calculate product performance metrics:

from
pyspark.sql.functions
import
sum, avg, count, desc
# Aggregate by category
category_stats = enriched_orders.groupBy(
"category"
).agg(
count(
"order_id"
).alias(
"total_orders"
),
sum(
"total"
).alias(
"revenue"
),
avg(
"total"
).alias(
"avg_order_value"
)
).orderBy(desc(
"revenue"
))
category_stats.show()
Expected Output
+-------------+------------+---------+---------------+
| category|total_orders| revenue|avg_order_value|
+-------------+------------+---------+---------------+
| Electronics| 2| 1359.97| 679.99|
| Furniture| 1| 199.99| 199.99|
+-------------+------------+---------+---------------+

3.3 Customer Segmentation

# Calculate customer lifetime value
customer_ltv = enriched_orders.groupBy(
"customer_id"
).agg(
count(
"order_id"
).alias(
"order_count"
),
sum(
"total"
).alias(
"lifetime_value"
),
avg(
"total"
).alias(
"avg_order_value"
)
)
# Segment customers
customer_segments = customer_ltv.withColumn(
"segment"
,
when(col(
"lifetime_value"
) >
1000
,
"VIP"
)
.when(col(
"lifetime_value"
) >
500
,
"High-Value"
)
.otherwise(
"Standard"
)
)
customer_segments.show()
4

Write Optimized Output & Test

15 min

4.1 Write Partitioned Parquet

Save processed data with optimizations:

# Write category stats to Parquet
category_stats.write \
.mode(
"overwrite"
) \
.parquet(
"data/processed/category_stats"
)
# Write customer segments with partitioning
customer_segments.write \
.mode(
"overwrite"
) \
.partitionBy(
"segment"
) \
.parquet(
"data/processed/customer_segments"
)
Why Parquet?
Parquet is a columnar storage format that compresses data 5-10x better than CSV and enables efficient queries. Partitioning by segment creates separate directories for each segment, speeding up filtered queries.

4.2 Verify Output

# Read back the data
saved_category_stats = spark.read.parquet(
"data/processed/category_stats"
)
saved_category_stats.show()
# Check partition structure
saved_segments = spark.read.parquet(
"data/processed/customer_segments"
)
print(f
"Partitions: {saved_segments.rdd.getNumPartitions()}"
)

4.3 Check Spark UI

Monitor your job execution:

  1. Open http://localhost:4040 in your browser
  2. Click on the "Jobs" tab to see all executed jobs
  3. Click on a job to see the DAG visualization
  4. Check the "Storage" tab to see cached DataFrames
  5. Review the "Executors" tab for resource usage
What to Look For
✓ Jobs should show "Succeeded" status
✓ DAG should show stages: read → transform → write
✓ No tasks should be marked as "Failed"

4.4 Performance Testing

import
time
# Test query performance
start_time = time.time()
result = saved_segments.filter(
col(
"segment"
) ==
"VIP"
).count()
elapsed = time.time() - start_time
print(f
"Query took {elapsed:.2f} seconds"
)
print(f
"Found {result} VIP customers"
)
Performance Tip
Partitioned reads are much faster! Querying for "VIP" only reads thesegment=VIP partition, skipping the other data entirely (partition pruning).
Troubleshooting
  • Out of memory errors: Reduce data size or increase Docker memory
  • Slow performance: Check Spark UI for data skew in partitions
  • File not found: Verify paths are relative to Jupyter working directory
See the Spark Troubleshooting Guide for more solutions.

Walkthrough Complete!

You've successfully built a production Spark pipeline with multi-format ingestion, transformations, and optimized output. You're ready for Part 2!

What You've Learned:

Spark environment setup with Docker
Reading CSV, JSON, and Parquet formats
DataFrame joins and aggregations
Data quality validation patterns
Customer segmentation with SQL functions
Optimized Parquet output with partitioning
Spark UI monitoring and debugging
Performance testing and optimization
Press Cmd+K to open