Skip to content
Engineering Insights
Core DE

Implementing Data Contracts in a dbt Monorepo

Priya NairMar 10, 20267 min read

Want to build this yourself?

This architecture is covered in our hands-on projects. Build it in the AI-DE sandbox.

Explore Projects

The Silent Breakage Problem

In a large dbt monorepo, models depend on each other in ways that aren't always obvious. An upstream engineer renames order_total to order_total_usd. A downstream model silently returns NULL for every row. The BI dashboard shows zeros. The CEO asks why revenue dropped to zero on Tuesday.

This is the silent breakage problem. dbt tests catch data quality issues — they don't catch schema drift before it reaches production.

Breaking Change TypeHow It Breaks DownstreamCaught By Tests?
Column renamedDownstream refs return NULLNo
Type changed (int → string)Cast errors or silent coercionSometimes
Column droppedQuery fails at runtimeOnly if tested
Nullability loosenedNULL rows in non-null joinsOnly if tested
Freshness SLA missedStale data in dashboardsOnly if monitored

What a Data Contract Actually Is

Three-Layer Contract Architecture

Layer 1 — Producer

fct_orders model (dbt)
contracts/orders_contract.yml
dbt contract: enforced: true

Owns & versions the contract

Layer 2 — Enforcement

GitHub Actions CI job
validate_contracts.py
information_schema diff
Block merge on violation

Automated, no human needed

Layer 3 — Consumer

Downstream dbt models
BI dashboards / reports
ML feature pipelines

Depends on contract, not impl.

A data contract is a machine-readable specification a producer commits to: column names, types, nullability guarantees, and freshness SLAs. Consumers depend on the contract — not the implementation. When the contract changes, it must be versioned. When it's violated, the merge is blocked.

The Contract Definition

We colocate contracts with source definitions in dbt. The YAML lives next to the model it describes:

yaml
# contracts/fct_orders_contract.yml
version: 1
model: fct_orders
owner: data-platform-team
consumers:
  - analytics-dashboards
  - ml-feature-pipeline
  - finance-reporting
guaranteed_columns:
  - name: order_id
    type: bigint
    nullable: false
    description: "Immutable surrogate key — never renamed or retyped"
  - name: customer_id
    type: bigint
    nullable: false
  - name: order_total_usd
    type: numeric(10,2)
    nullable: false
  - name: order_status
    type: varchar
    nullable: false
    allowed_values: [pending, confirmed, shipped, delivered, cancelled]
  - name: created_at
    type: timestamp with time zone
    nullable: false
freshness_sla_minutes: 60
breaking_change_policy: require_major_version_bump

dbt 1.5+ also supports native contract enforcement in your model config — this is the first line of defense:

yaml
# models/marts/fct_orders.yml
models:
  - name: fct_orders
    config:
      contract:
        enforced: true   # dbt will fail the run if column types don't match
    columns:
      - name: order_id
        data_type: bigint
        constraints:
          - type: not_null
          - type: primary_key
      - name: order_total_usd
        data_type: numeric
        constraints:
          - type: not_null

The CI Enforcement Script

Contract Enforcement in CI/CD

1

Schema Change PR

Developer renames column or changes type in upstream model

2

CI Contract Check

Python script diffs contract YAML vs information_schema on CI target

FAIL ↓PASS →

Merge & Deploy

Contract satisfied — merge unblocked, deploy proceeds

Merge Blocked

CI fails with diff output — shows which contract fields were violated. PR cannot merge until contract is updated or the change is reverted.

The contract YAML defines intent. The CI script enforces it. This runs on every PR that touches a contracted model:

python
# scripts/validate_contracts.py
import yaml, sys
from pathlib import Path
import snowflake.connector

def load_contract(contract_path: str) -> dict:
    with open(contract_path) as f:
        return yaml.safe_load(f)

def get_warehouse_columns(conn, model: str) -> dict[str, dict]:
    """Query information_schema for the model's current column definitions."""
    cursor = conn.cursor()
    cursor.execute(f"""
        SELECT column_name, data_type, is_nullable
        FROM information_schema.columns
        WHERE table_name = UPPER('{model}')
          AND table_schema = UPPER(%(schema)s)
    """, {"schema": "analytics"})
    return {
        row[0].lower(): {"type": row[1].lower(), "nullable": row[2] == "YES"}
        for row in cursor.fetchall()
    }

def validate_contract(contract: dict, warehouse_cols: dict) -> list[str]:
    violations = []
    for col in contract["guaranteed_columns"]:
        name = col["name"]
        if name not in warehouse_cols:
            violations.append(f"MISSING: column '{name}' was dropped or renamed")
            continue
        actual = warehouse_cols[name]
        if col["type"].split("(")[0] not in actual["type"]:
            violations.append(
                f"TYPE CHANGE: '{name}' expected {col['type']}, got {actual['type']}"
            )
        if not col["nullable"] and actual["nullable"]:
            violations.append(
                f"NULLABILITY: '{name}' was non-nullable, now allows NULLs"
            )
    return violations

def main():
    conn = snowflake.connector.connect(
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        user=os.environ["SNOWFLAKE_CI_USER"],
        password=os.environ["SNOWFLAKE_CI_PASSWORD"],
    )
    contracts_dir = Path("contracts")
    all_violations = []

    for contract_file in contracts_dir.glob("*.yml"):
        contract = load_contract(contract_file)
        warehouse_cols = get_warehouse_columns(conn, contract["model"])
        violations = validate_contract(contract, warehouse_cols)
        if violations:
            print(f"\nContract violations in {contract_file.name}:")
            for v in violations:
                print(f"  ✗ {v}")
            all_violations.extend(violations)

    if all_violations:
        print(f"\n{len(all_violations)} contract violation(s) found. Merge blocked.")
        sys.exit(1)
    else:
        print("All contracts satisfied. ✓")

if __name__ == "__main__":
    main()

The GitHub Actions Workflow

Wire the script into CI so every PR touching a contracted model runs the check:

yaml
# .github/workflows/contract-check.yml
name: Data Contract Validation

on:
  pull_request:
    paths:
      - 'models/marts/**'
      - 'models/staging/**'
      - 'contracts/**'

jobs:
  validate-contracts:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install pyyaml snowflake-connector-python dbt-snowflake

      - name: Run dbt compile (CI slim run)
        run: |
          dbt compile --select state:modified+ --defer --state ./prod-manifest
        env:
          DBT_PROFILES_DIR: .

      - name: Validate data contracts
        run: python scripts/validate_contracts.py
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }}
          SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }}

      - name: Post violation summary to PR
        if: failure()
        uses: actions/github-script@v7
        with:
          script: |
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: '## Contract Violations Detected\n\nThis PR modifies a contracted model. Run `python scripts/validate_contracts.py` locally to see violations before pushing.'
            })

Handling Breaking Changes the Right Way

Not all schema changes are emergency rollbacks. Sometimes you genuinely need to rename a column. The protocol:

yaml
# contracts/fct_orders_v2_contract.yml — bump the version
version: 2
model: fct_orders
migration_notes: "order_total renamed to order_total_usd in v2. Consumers have 30 days to migrate."
deprecated_columns:
  - name: order_total
    removed_in_version: 3
    replacement: order_total_usd
guaranteed_columns:
  - name: order_total_usd   # new name
    type: numeric(10,2)
    nullable: false
  - name: order_total       # keep old name as alias during migration window
    type: numeric(10,2)
    nullable: true
    deprecated: true

During the migration window, both columns exist. Downstream teams migrate at their own pace. The old column is removed only when v3 is cut and all consumers have confirmed migration.

Lessons From Production

What WorksWhat Doesn't
Contracts on your 5–10 most-depended-upon modelsContracting everything upfront
CI that blocks merges (not just warns)Slack notifications with no enforcement
Versioned contracts with migration windowsHard-cutting columns without notice
Colocating contracts with model definitionsStoring contracts in a separate repo
Starting with fact tables (high-fan-out)Starting with staging models (low-impact)

The technical contract is the easy part. The social contract — getting upstream teams to treat schema changes like API changes — is the real work.

Go Deeper: Governance & Data Contracts

This pattern — contract YAML, CI enforcement, versioned migrations — is one layer of a full data governance framework. The complete picture includes schema registries, data lineage tracking, access control, and compliance controls that scale across teams.

Our Governance & Data Contracts skill covers the full stack: schema evolution patterns, drift detection automation, contract design at scale, lineage and classification, enforcement in production, and enterprise governance frameworks.

Start the Governance & Data Contracts skill

Ready to go deeper?

Explore our full curriculum — hands-on skill toolkits built for production data engineering.

Press Cmd+K to open