Every data team has a CI/CD story. Usually it goes like this: "We adapted our software team's CI/CD pipeline. It runs dbt test on every PR. It works... most of the time." The "most of the time" is where production breaks hide.
Software CI/CD assumes stateless deployments: ship a new version of the service, verify health checks, roll back if something fails. The state lives in the database, not the deploy.
Data CI/CD doesn't have that luxury. When you deploy a new dbt model or change a pipeline transformation, the output is data — and data that reaches the warehouse is immediately consumed by dashboards, reports, ML models, and downstream pipelines. A bad deploy doesn't just serve errors. It serves wrong numbers. Quietly. For hours or days.
That's why copying your software team's CI/CD template and adding dbt test doesn't work. Data CI/CD needs schema validation, data quality gates, environment parity for stateful systems, and deployment strategies that validate output before consumers see it.
This guide covers the full production CI/CD lifecycle for data pipelines — from lint to post-deploy validation — with working examples using the tools data engineers actually use: GitHub Actions, dbt, Great Expectations, Airflow, and Snowflake.
TL;DR — the 6 things to take away
- Software CI/CD is stateless — deploy, verify, rollback. Data CI/CD is stateful — bad data propagates downstream and may take days to detect.
- The three stages most data teams skip: schema backward compatibility, data quality gates in CI, and post-deploy validation.
- Environment parity is the hardest problem. Testing against clean fixtures misses the production edge cases that actually break pipelines.
- Use sampled production data (anonymized) for CI testing — not synthetic data. Synthetic data doesn't reproduce real-world distributions, nulls, or schema drift.
- The full CI/CD lifecycle: lint → schema check → unit test → integration test → data quality gate → staging deploy → production deploy → post-deploy validation.
- Platform engineers should build this as a shared template. Individual DEs shouldn't reinvent CI/CD for every pipeline.
Why Software CI/CD Patterns Break for Data
Before building, understand why the patterns you know from software don't transfer cleanly.
The statefulness problem. Software deployment: ship v2.1, health check passes, traffic routed, done. Or: ship v2.1, health check fails, rollback to v2.0, done.
Data deployment has no health check that catches the worst failures. The pipeline runs, data gets written, dashboards refresh, analysts make decisions — and two days later someone asks "why did revenue drop 40%?" The answer turns out to be a schema change in the new model that silently nullified a join. You can't roll back the decisions analysts made based on wrong data. By the time someone notices, the damage is done.
The environment problem. Software teams solve environment parity with containers — same image runs in dev, staging, prod. Data teams can't containerize their data. A pipeline that passes tests against 1,000 clean rows may fail against 10 million rows with nulls in unexpected columns, Unicode in name fields, and timestamps in three different timezones.
The schema problem. Software APIs have versioned schemas (OpenAPI, protobuf). Data schemas evolve continuously — new columns added, types changed, columns renamed. A dbt model that removes a column might pass all of its own tests while breaking 5 downstream models and 3 dashboards.
The Full CI/CD Lifecycle for Data Pipelines
Seven stages. The first four run on every PR; the last three run after merge. Each stage prevents a specific class of production failure.
Data Pipeline CI/CD Lifecycle
=============================
PR Created
|
v
[ 1. LINT ............... SQL + Python + YAML ]
|
v
[ 2. SCHEMA CHECK ....... dbt manifest backward compat ]
|
v
[ 3. TEST ............... unit + integration on sampled prod data ]
|
v
[ 4. DATA QUALITY GATE .. Great Expectations / Soda assertions ]
|
v
--- Merge to main ---
|
v
[ 5. STAGING DEPLOY ..... validate output before promote ]
|
v
[ 6. PRODUCTION DEPLOY .. blue/green schema swap, atomic ]
|
v
[ 7. POST-DEPLOY CHECK .. freshness + volume + null rates vs baseline ]Stage 1 — Lint
Linting catches style and syntax issues before they waste reviewer time. The platform engineer owns the .sqlfluff config and distributes it via a shared repo template — every team uses the same config, not each squad picking their own style.
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: SQL Lint
run: |
pip install sqlfluff
sqlfluff lint models/ --dialect snowflake --config .sqlfluff
- name: Python Lint
run: |
pip install ruff
ruff check dags/ scripts/ tests/
- name: YAML Validation
run: |
pip install yamllint
yamllint .github/workflows/ dbt_project.ymlStage 2 — Schema Backward Compatibility
This is the stage most data teams skip entirely. It's also where the worst production failures originate. A DE removes a column from stg_orders that fct_revenue depends on. Without this check, the PR passes all dbt tests (because stg_orders tests still pass), merges, and breaks the revenue dashboard the next morning. With this check, the PR is blocked and the downstream impact is surfaced in the PR comment.
"""
Schema backward compatibility checker.
Compares dbt model schemas between the PR branch and the base branch.
Flags breaking changes before they reach production.
"""
import json
from dataclasses import dataclass, field
from enum import Enum
class Severity(Enum):
SAFE = "safe" # Additive — new columns, new models
WARNING = "warning" # Type changes that might break consumers
BREAKING = "breaking" # Removed columns, renamed models
@dataclass
class SchemaChange:
model: str
severity: Severity
description: str
downstream_models: list[str] = field(default_factory=list)
def get_downstream(manifest: dict, model_name: str) -> list[str]:
"""Find all models that depend on a given model."""
target_id = None
for node_id, node in manifest["nodes"].items():
if node["name"] == model_name:
target_id = node_id
break
if not target_id:
return []
return [
node["name"]
for node in manifest["nodes"].values()
if target_id in node.get("depends_on", {}).get("nodes", [])
]
def check_compatibility(base_manifest: dict, head_manifest: dict) -> list[SchemaChange]:
"""Compare two dbt manifests and return schema changes."""
changes = []
base_models = {
node["name"]: node
for node in base_manifest["nodes"].values()
if node["resource_type"] == "model"
}
head_models = {
node["name"]: node
for node in head_manifest["nodes"].values()
if node["resource_type"] == "model"
}
for model_name, base_node in base_models.items():
if model_name not in head_models:
changes.append(SchemaChange(
model=model_name,
severity=Severity.BREAKING,
description="Model removed",
downstream_models=get_downstream(base_manifest, model_name),
))
continue
base_cols = set(base_node.get("columns", {}).keys())
head_cols = set(head_models[model_name].get("columns", {}).keys())
removed_cols = base_cols - head_cols
if removed_cols:
changes.append(SchemaChange(
model=model_name,
severity=Severity.BREAKING,
description=f"Columns removed: {', '.join(sorted(removed_cols))}",
downstream_models=get_downstream(head_manifest, model_name),
))
# Check type changes
for col_name in base_cols & head_cols:
base_type = base_node["columns"].get(col_name, {}).get("data_type", "")
head_type = head_models[model_name]["columns"].get(col_name, {}).get("data_type", "")
if base_type and head_type and base_type != head_type:
changes.append(SchemaChange(
model=model_name,
severity=Severity.WARNING,
description=f"Column '{col_name}' type changed: {base_type} → {head_type}",
downstream_models=get_downstream(head_manifest, model_name),
))
return changesschema-check:
runs-on: ubuntu-latest
needs: [lint]
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Compile base branch manifest
run: |
git checkout ${{ github.base_ref }}
dbt compile --target ci
cp target/manifest.json /tmp/base_manifest.json
- name: Compile PR branch manifest
run: |
git checkout ${{ github.head_ref }}
dbt compile --target ci
cp target/manifest.json /tmp/head_manifest.json
- name: Check schema compatibility
run: |
python platform_tools/schema_check.py \
--base /tmp/base_manifest.json \
--head /tmp/head_manifest.json \
--fail-on breakingStage 3 — Testing (Unit + Integration)
Unit tests cover model-level logic — not-null, unique, accepted values, relationships. They're cheap, fast, and table-stakes for any data CI pipeline.
models:
- name: fct_revenue
columns:
- name: order_id
tests:
- not_null
- unique
- name: revenue_usd
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: currency
tests:
- accepted_values:
values: ['USD', 'EUR', 'GBP', 'JPY', 'CAD']Integration tests verify the pipeline end-to-end using real (sampled) data — not clean test fixtures. The key decision is the test data strategy. Use sampled, anonymized production data, not synthetic fixtures. Synthetic data doesn't reproduce the distribution patterns, edge cases, and schema drift that cause production failures. Sample 1–5% of production data, anonymize PII, and refresh weekly.
"""
Integration test for the revenue pipeline.
Runs against a CI Snowflake schema with sampled production data.
"""
import os
from datetime import datetime, timedelta
import pytest
import snowflake.connector
@pytest.fixture
def snowflake_conn():
conn = snowflake.connector.connect(
account=os.environ["SNOWFLAKE_CI_ACCOUNT"],
user=os.environ["SNOWFLAKE_CI_USER"],
password=os.environ["SNOWFLAKE_CI_PASSWORD"],
warehouse="CI_WH_XS",
database="CI_DATABASE",
schema=f"CI_{os.environ['GITHUB_RUN_ID']}", # Isolated per CI run
)
yield conn
conn.close()
class TestRevenuePipeline:
"""Verify the revenue pipeline produces correct, complete output."""
def test_no_orphaned_orders(self, snowflake_conn):
"""Every order in fct_revenue has a matching source record."""
result = snowflake_conn.cursor().execute("""
SELECT COUNT(*)
FROM fct_revenue r
LEFT JOIN stg_orders o ON r.order_id = o.order_id
WHERE o.order_id IS NULL
""").fetchone()
assert result[0] == 0, f"Found {result[0]} orphaned orders in fct_revenue"
def test_revenue_not_negative(self, snowflake_conn):
"""No negative revenue values after currency conversion."""
result = snowflake_conn.cursor().execute("""
SELECT COUNT(*), MIN(revenue_usd)
FROM fct_revenue
WHERE revenue_usd < 0
""").fetchone()
assert result[0] == 0, f"Found {result[0]} negative revenue rows (min: {result[1]})"
def test_freshness(self, snowflake_conn):
"""Most recent data should be within 24 hours."""
result = snowflake_conn.cursor().execute(
"SELECT MAX(event_timestamp) FROM fct_revenue"
).fetchone()
latest = result[0]
assert latest > datetime.utcnow() - timedelta(hours=24), \
f"Stale data: latest record is {latest}"
def test_row_count_within_bounds(self, snowflake_conn):
"""Row count should be within 20% of the previous run."""
current = snowflake_conn.cursor().execute(
"SELECT COUNT(*) FROM fct_revenue"
).fetchone()[0]
previous = snowflake_conn.cursor().execute(
"SELECT row_count FROM pipeline_metadata.run_history "
"WHERE model = 'fct_revenue' ORDER BY run_at DESC LIMIT 1"
).fetchone()[0]
pct_change = abs(current - previous) / previous * 100
assert pct_change < 20, \
f"Row count changed {pct_change:.1f}% ({previous} → {current})"Stage 4 — Data Quality Gate
The quality gate runs after tests pass and before merge is allowed. This is where Great Expectations or Soda catches issues that dbt tests miss — distribution shifts, null-rate regressions, cardinality changes that aren't expressible as simple not_null assertions.
"""
CI data quality gate using Great Expectations.
Runs expectations against the CI schema for all modified models.
"""
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
def run_quality_gate(modified_models: list[str], ci_schema: str) -> dict:
"""Run GX expectations against modified models in CI."""
context = gx.get_context()
results = {}
for model in modified_models:
suite_name = f"ci_gate.{model}"
# Each model has a CI expectation suite — stricter than prod monitoring.
batch_request = BatchRequest(
datasource_name="snowflake_ci",
data_connector_name="default_inferred",
data_asset_name=f"{ci_schema}.{model}",
)
result = context.run_checkpoint(
checkpoint_name="ci_quality_gate",
batch_request=batch_request,
expectation_suite_name=suite_name,
)
results[model] = {
"success": result.success,
"statistics": result.statistics,
"failed_expectations": [
r.expectation_config.expectation_type
for r in result.results
if not r.success
],
}
return resultsexpectations:
- expectation_type: expect_column_values_to_not_be_null
kwargs:
column: order_id
- expectation_type: expect_column_values_to_be_between
kwargs:
column: revenue_usd
min_value: 0
max_value: 1000000
mostly: 0.999 # Allow 0.1% outliers
- expectation_type: expect_column_distinct_count_to_be_between
kwargs:
column: currency
min_value: 3
max_value: 10
- expectation_type: expect_table_row_count_to_be_between
kwargs:
min_value: 10000 # CI sample should have at least 10K rowsStages 5–6 — Staging and Production Deploy
For schema changes, blue-green deployment gives zero-downtime swaps. Build into a shadow schema, validate the output matches production patterns, then swap atomically with ALTER SCHEMA RENAME. If post-swap checks fail, swap back. The full cycle is ~30 seconds and downstream consumers never see a partial state.
"""
Blue-green deployment for dbt models.
Builds into a shadow schema, validates, then swaps atomically.
"""
class BlueGreenDeployer:
"""Deploy dbt models using blue-green schema swap."""
def __init__(self, conn, production_schema: str):
self.conn = conn
self.production_schema = production_schema
self.shadow_schema = f"{production_schema}_shadow"
def deploy(self, dbt_target: str = "production") -> bool:
# Step 1: Build into shadow schema
self._run_dbt_build(target=dbt_target, schema=self.shadow_schema)
# Step 2: Validate shadow output
validation = self._validate_shadow()
if not validation["passed"]:
print(f"Shadow validation failed: {validation['errors']}")
self._cleanup_shadow()
return False
# Step 3: Swap schemas atomically
self._swap_schemas()
# Step 4: Post-swap verification
post_check = self._post_swap_check()
if not post_check["passed"]:
print(f"Post-swap check failed, rolling back: {post_check['errors']}")
self._swap_schemas() # Swap back
return False
# Step 5: Clean up old schema
self._cleanup_shadow()
return True
def _swap_schemas(self):
"""Atomic schema swap — zero downtime."""
self.conn.cursor().execute(f"""
ALTER SCHEMA {self.production_schema}
RENAME TO {self.production_schema}_old;
ALTER SCHEMA {self.shadow_schema}
RENAME TO {self.production_schema};
ALTER SCHEMA {self.production_schema}_old
RENAME TO {self.shadow_schema};
""")
def _validate_shadow(self) -> dict:
"""Validate shadow schema matches production patterns."""
checks = []
# Row count comparison — >25% drift fails
for table in self._get_tables(self.shadow_schema):
shadow_count = self._count(self.shadow_schema, table)
prod_count = self._count(self.production_schema, table)
if prod_count > 0:
pct_change = abs(shadow_count - prod_count) / prod_count * 100
if pct_change > 25:
checks.append(
f"{table}: row count changed {pct_change:.1f}% "
f"({prod_count} → {shadow_count})"
)
return {"passed": len(checks) == 0, "errors": checks}Stage 7 — Post-Deploy Validation
CI validates against a CI environment. Production has different data volumes, different concurrent loads, different schema states. Always run post-deploy validation that checks freshness, volume, and null rates against historical baselines — within 30 minutes of every production deploy. Critical-severity failures page the deploying team; warnings get logged for review.
"""
Post-deploy validation checks.
Runs within 30 minutes of production deploy.
Alerts the deploying team if any check fails.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class ValidationResult:
check_name: str
passed: bool
message: str
severity: str # "critical" or "warning"
class PostDeployValidator:
"""Validate production output after deployment."""
def __init__(self, conn, deployed_models: list[str]):
self.conn = conn
self.deployed_models = deployed_models
def run_all(self) -> list[ValidationResult]:
results = []
for model in self.deployed_models:
results.extend(self._validate_model(model))
return results
def _validate_model(self, model: str) -> list[ValidationResult]:
checks = []
# 1. Freshness check
latest = self.conn.cursor().execute(
f"SELECT MAX(_loaded_at) FROM {model}"
).fetchone()[0]
if latest and latest < datetime.utcnow() - timedelta(hours=2):
checks.append(ValidationResult(
check_name=f"{model}.freshness",
passed=False,
message=f"Data is stale: latest record at {latest}",
severity="critical",
))
# 2. Volume check — compare to same day last week
current_count, historical_count = self.conn.cursor().execute(f"""
SELECT
(SELECT COUNT(*) FROM {model}
WHERE DATE(_loaded_at) = CURRENT_DATE),
(SELECT COUNT(*) FROM {model}
WHERE DATE(_loaded_at) = DATEADD(week, -1, CURRENT_DATE))
""").fetchone()
if historical_count and historical_count > 0:
pct_change = (current_count - historical_count) / historical_count * 100
if abs(pct_change) > 30:
checks.append(ValidationResult(
check_name=f"{model}.volume",
passed=False,
message=f"Volume changed {pct_change:+.1f}% vs last week "
f"({historical_count} → {current_count})",
severity="warning",
))
# 3. Null rate check for critical columns
for col, rate in self._check_null_rates(model).items():
if rate > 0.05: # >5% nulls in critical columns
checks.append(ValidationResult(
check_name=f"{model}.nulls.{col}",
passed=False,
message=f"Column '{col}' has {rate:.1%} null rate",
severity="warning",
))
if not checks:
checks.append(ValidationResult(
check_name=f"{model}.all",
passed=True,
message="All post-deploy checks passed",
severity="info",
))
return checksThe Complete CI/CD YAML
Tying all 4 PR-gated stages together into a single GitHub Actions workflow. Stages 5–7 run on merge-to-main via a separate workflow not shown here.
name: Data Pipeline CI/CD
on:
pull_request:
paths:
- 'models/**'
- 'dags/**'
- 'tests/**'
- 'macros/**'
- 'expectations/**'
env:
SNOWFLAKE_CI_ACCOUNT: ${{ secrets.SNOWFLAKE_CI_ACCOUNT }}
SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }}
SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }}
CI_SCHEMA: CI_${{ github.run_id }}
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install sqlfluff ruff yamllint
- run: sqlfluff lint models/ --dialect snowflake
- run: ruff check dags/ scripts/ tests/
schema-check:
runs-on: ubuntu-latest
needs: [lint]
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- run: pip install dbt-snowflake
- run: |
python platform_tools/schema_check.py \
--base-ref ${{ github.base_ref }} \
--head-ref ${{ github.head_ref }} \
--fail-on breaking
test:
runs-on: ubuntu-latest
needs: [schema-check]
steps:
- uses: actions/checkout@v4
- run: pip install dbt-snowflake
- run: |
dbt build --target ci --schema $CI_SCHEMA --select state:modified+
dbt test --target ci --schema $CI_SCHEMA --select state:modified+
- run: pytest tests/integration/ -v --schema $CI_SCHEMA
quality-gate:
runs-on: ubuntu-latest
needs: [test]
steps:
- uses: actions/checkout@v4
- run: pip install great-expectations
- run: |
python platform_tools/quality_gate.py \
--models $(dbt ls --select state:modified --output name) \
--schema $CI_SCHEMA \
--fail-on-errorCommon Mistakes
- Testing against synthetic data. Synthetic test data is clean, well-distributed, schema-compliant. Production data isn't. The pipeline that passes against 1,000 synthetic rows breaks against 10 million production rows with nulls in "not null" columns, mixed-timezone timestamps, and Unicode in address fields. Use sampled, anonymized production data instead.
- Running dbt test without schema backward compatibility. dbt tests validate the model's own output. They don't check whether the model's changes break downstream consumers. A column rename passes
not_nullanduniquetests on the model itself while breaking every model that references the old column name. Schema compatibility is a separate, mandatory stage. - No post-deploy validation. "The CI passed, so it's fine." CI validates against a CI environment. Production has different volumes, different loads, different schema states. Always run post-deploy validation against historical baselines.
- Every team building their own CI/CD. If four squads each build their own CI pipeline, you get four different quality standards, four different deployment processes, and four times the maintenance burden. CI/CD for data is a platform concern — build it once as a shared template that all teams use.
- Treating rollback as trivial. In software, rollback means deploying the previous version. In data, rollback means: which tables were affected? Were they consumed by downstream models? Did reports already pull the bad data? Did any ML models retrain on it? Build lineage tracking and blast-radius analysis into your deployment system before you need it.
FAQ — Questions Data Teams Actually Ask
Can I use my software team's CI/CD pipeline for data?
As a starting point, yes — but you'll need significant additions. Software CI/CD tests code behavior. Data CI/CD tests code behavior plus data behavior (schema compatibility, data quality, output validation). Plan to add at least three stages that don't exist in software pipelines: schema backward compatibility, data quality gates, and post-deploy validation.
How do you handle environment parity for data pipelines?
The most effective approach: sample 1–5% of production data, anonymize PII, and load it into a CI-specific schema. Refresh weekly. This gives CI tests realistic data distributions without the cost of full production clones. Avoid synthetic data — it doesn't reproduce the edge cases that cause production failures.
Which tools work best for data CI/CD in 2026?
Most common production stack: GitHub Actions (CI orchestration) + dbt (model testing) + Great Expectations or Soda (data quality gates) + Snowflake or BigQuery (CI warehouse) + custom Python (schema checks, impact analysis, deployment). Airflow handles production orchestration but doesn't run in CI — CI is about validating changes before they reach Airflow.
Should CI/CD run on every PR or only for production models?
Run lint and schema checks on every PR. Run integration tests and data quality gates on PRs that modify production-path models (staging, marts). Skip heavy integration tests for documentation-only changes or development-branch models. Use path filters in GitHub Actions to control which stages run.
How do you test Airflow DAGs in CI?
Test DAG validity (syntax, import errors) by importing the DAG file in a pytest fixture — this catches most issues. For logic testing, extract business logic into testable Python functions and unit test those separately. Don't try to run full Airflow tasks in CI — it's slow and fragile. Test the logic, validate the DAG structure, and rely on staging deploys for end-to-end verification.
Conclusion
Data CI/CD isn't software CI/CD with dbt test bolted on. It's a fundamentally different problem because data pipelines are stateful, schemas evolve continuously, and bad data can't be rolled back with a git revert.
The full lifecycle — lint, schema compatibility, testing, data quality gates, staging, production deploy, post-deploy validation — looks complex, but each stage prevents a specific class of production failure. Build it incrementally: start with schema checks and quality gates, then add deployment automation and post-deploy validation.
And if you're on a team with more than 5 engineers, build this as a shared platform — not something each team reinvents.
Build the CI/CD Data Platform project
The hardest part of this guide isn't reading it — it's wiring all 7 stages together against a real warehouse with real schema drift. The CI/CD Data Platform project on AI-DE walks through the complete build, from the shared CI template to the blue-green deployment to the post-deploy validation layer.
It ships as a portfolio-grade implementation specifically designed to prove platform thinking to hiring managers — exactly the kind of artifact that gets data platform / staff engineer interviews.