Skip to content
Engineering Insights
Architecture

How to Design a Modern Data + AI System: Control, Data, and Decision Planes

AI-DEApr 15, 202612 min read

Want to build this yourself?

This architecture is covered in our hands-on projects. Build it in the AI-DE sandbox.

Explore Projects

The Problem With "Just Add AI"

Most data teams get their first AI feature working in a weekend: connect a dbt model to an LLM API, write a Python script in a DAG, ship it. Six months later, that "AI feature" is the least reliable thing in your stack. Costs spike unpredictably. Prompts silently drift. Bad data makes it to the model. And when something breaks, no one knows if it's the data, the prompt, or the model.

The teams that avoid this pattern have one thing in common: they think in planes.

Modern Data + AI System — Three-Plane Architecture

Control Plane

Apache Airflow

DAG Scheduling
ExternalTaskSensor
Retry + SLA
Backfill

What runs, when, in what order

waits for
data ready

Data Plane

dbt + Spark

Staging → Intermediate → Mart
Column Contracts
dbt Tests (quality gate)
AI-ready feature tables

What data looks like before the model

clean
features

Decision Plane

LLM / ML Inference

Batch inference (Anthropic API)
Structured JSON output
Cost-tiered model selection
Idempotent writes back to DWH

Where intelligence is applied

Each plane has one responsibility — failures are isolated, layers are independently testable

A modern data + AI system has three explicit layers:

  • Control Plane — *What runs, when, and in what order* (Airflow)
  • Data Plane — *What the data looks like before it reaches the model* (dbt + Spark)
  • Decision Plane — *Where intelligence is applied* (LLM / ML inference)
  • Each plane has a single responsibility. Each failure mode is isolated. Each layer is independently testable.

    The Control Plane: Airflow as the Nervous System

    The Control Plane doesn't transform data. It doesn't call models. It does one thing: declare dependencies and schedule execution.

    python
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.sensors.external_task import ExternalTaskSensor
    from datetime import datetime, timedelta
    
    default_args = {
        "owner": "data-platform",
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "email_on_failure": True,
    }
    
    with DAG(
        "ai_recommendation_pipeline",
        default_args=default_args,
        schedule_interval="0 2 * * *",  # 2 AM daily
        start_date=datetime(2026, 1, 1),
        catchup=False,
        tags=["ai", "recommendations"],
    ) as dag:
    
        # Wait for upstream data to be ready
        wait_for_dbt = ExternalTaskSensor(
            task_id="wait_for_dbt_completion",
            external_dag_id="dbt_production",
            external_task_id="dbt_run",
            timeout=3600,
            poke_interval=60,
        )
    
        prepare_features = PythonOperator(
            task_id="prepare_features",
            python_callable=run_feature_prep,
            op_kwargs={"execution_date": "{{ ds }}"},
        )
    
        run_inference = PythonOperator(
            task_id="run_llm_inference",
            python_callable=batch_llm_inference,
            op_kwargs={"model": "claude-haiku-4-5-20251001", "batch_size": 100},
        )
    
        write_results = PythonOperator(
            task_id="write_results",
            python_callable=write_to_warehouse,
        )
    
        wait_for_dbt >> prepare_features >> run_inference >> write_results

    Three things this DAG does correctly:

  • It waits for data to be ready before starting (ExternalTaskSensor)
  • It separates preparation from inference — two distinct tasks with different retry behavior
  • It writes results back to the warehouse, not to an ad-hoc S3 bucket
  • The Control Plane's job is visibility and reliability, not logic. If your DAG contains business logic, data transformations, or prompt engineering, you've mixed planes.

    The Data Plane: dbt as the Foundation

    The single most important thing you can do for an AI system is guarantee the quality of its inputs. A model is only as good as what you feed it. The Data Plane's job is to make that guarantee.

    The Three-Layer dbt Pattern

    sql
    -- 1. Staging: rename + cast raw source fields
    -- models/staging/stg_user_events.sql
    SELECT
        id                                    AS event_id,
        user_id,
        event_type,
        CAST(created_at AS TIMESTAMP)         AS event_at,
        CAST(properties AS JSON)              AS properties
    FROM {{ source('raw', 'events') }}
    WHERE event_type IS NOT NULL
    sql
    -- 2. Intermediate: business logic lives here
    -- models/intermediate/int_user_engagement_rolling.sql
    SELECT
        user_id,
        COUNT(*) FILTER (WHERE event_at >= CURRENT_DATE - 7)   AS event_count_7d,
        COUNT(DISTINCT event_type)
          FILTER (WHERE event_at >= CURRENT_DATE - 7)          AS distinct_event_types_7d,
        DATE_DIFF('day', MAX(event_at), CURRENT_DATE)          AS days_since_last_event
    FROM {{ ref('stg_user_events') }}
    GROUP BY 1
    sql
    -- 3. Mart: the clean, AI-ready feature table
    -- models/marts/fct_user_features_for_ai.sql
    {{ config(
        materialized='table',
        contract={"enforced": true}
    ) }}
    
    SELECT
        u.user_id,
        u.plan_tier,
        u.days_since_signup,
        e.event_count_7d,
        e.days_since_last_event,
        -- Pre-computed context string — inspect this in any SQL client
        CONCAT(
            'User has been on the ', u.plan_tier, ' plan for ',
            u.days_since_signup, ' days. ',
            'Last active ', e.days_since_last_event, ' days ago. ',
            'Performed ', e.event_count_7d, ' events in the last week.'
        ) AS user_summary_for_llm
    FROM {{ ref('dim_users') }} u
    LEFT JOIN {{ ref('int_user_engagement_rolling') }} e USING (user_id)

    The `user_summary_for_llm` column is the key pattern: pre-compute the context string in SQL, not in Python at inference time. This means token count is predictable per row, the LLM always gets clean consistent context, and you can inspect it in any SQL client.

    dbt Contracts as AI Quality Gates

    yaml
    # models/marts/fct_user_features_for_ai.yml
    version: 2
    
    models:
      - name: fct_user_features_for_ai
        contract:
          enforced: true
        columns:
          - name: user_id
            data_type: varchar
            constraints:
              - type: not_null
              - type: unique
          - name: user_summary_for_llm
            data_type: varchar
            constraints:
              - type: not_null
        tests:
          - not_null:
              column_name: user_summary_for_llm
          - dbt_expectations.expect_column_value_lengths_to_be_between:
              column_name: user_summary_for_llm
              min_value: 50
              max_value: 500  # guard against runaway context strings

    If dbt tests fail, the Airflow ExternalTaskSensor blocks inference from running. This is the Data Plane's quality gate — and it requires zero custom code in the AI pipeline itself. Bad data fails loudly before a single API call is made.

    The Decision Plane: LLM Inference at Batch Scale

    The Decision Plane takes clean, structured input from the Data Plane and produces structured, storable output. It does not transform data. It does not write to production databases directly. It returns results to the Control Plane's write step.

    Structured Batch Inference with the Anthropic API

    python
    import anthropic
    import json
    import concurrent.futures
    from dataclasses import dataclass
    
    client = anthropic.Anthropic()
    
    @dataclass
    class UserRecommendation:
        user_id: str
        recommendation_type: str
        recommendation_text: str
        confidence_score: float
        reasoning: str
    
    SYSTEM_PROMPT = """You are a data-driven product advisor. Given a user engagement summary,
    recommend the single most impactful action for this user.
    
    Respond in JSON with this exact structure:
    {
      "recommendation_type": "upgrade_prompt|re_engagement|feature_discovery|churn_risk",
      "recommendation_text": "specific, actionable text",
      "confidence_score": 0.0-1.0,
      "reasoning": "one sentence explaining why"
    }"""
    
    def get_recommendation(user_id: str, user_summary: str) -> UserRecommendation:
        message = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=256,
            system=SYSTEM_PROMPT,
            messages=[{"role": "user", "content": user_summary}],
        )
        result = json.loads(message.content[0].text)
        return UserRecommendation(
            user_id=user_id,
            recommendation_type=result["recommendation_type"],
            recommendation_text=result["recommendation_text"],
            confidence_score=result["confidence_score"],
            reasoning=result["reasoning"],
        )
    
    def batch_llm_inference(execution_date: str, batch_size: int = 100) -> None:
        """
        Airflow callable: reads features from DWH, runs inference, writes back.
        Idempotent — safe to retry on partial failure.
        """
        rows = read_pending_rows(execution_date, batch_size)
    
        if not rows:
            print(f"No rows to process for {execution_date}")
            return
    
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = {
                executor.submit(get_recommendation, row["user_id"], row["user_summary_for_llm"]): row["user_id"]
                for row in rows
            }
            for future in concurrent.futures.as_completed(futures):
                try:
                    results.append(future.result())
                except Exception as e:
                    print(f"Inference failed for user {futures[future]}: {e}")
    
        write_recommendations_to_staging(results, execution_date)

    Two patterns worth noting:

    Idempotent reads: The `read_pending_rows` query excludes users already in the results table for that execution date. Safe to retry on partial failures without re-billing for completed rows.

    Structured output: Defining the exact JSON schema in the system prompt + using a typed dataclass on the output means the Decision Plane's output is always structured, always storable, and always auditable.

    Cost-Tiered Model Selection

    python
    def select_model(task_complexity: str) -> str:
        """Choose the cheapest model that meets the quality bar."""
        high_stakes = {"churn_risk", "upgrade_prompt"}
        if task_complexity in high_stakes:
            return "claude-sonnet-4-6"      # more capable, ~4x cost
        return "claude-haiku-4-5-20251001"  # fast, cheap, sufficient for simple tasks

    Haiku is 4× cheaper than Sonnet and fast enough for classification and short-form generation. Reserve Sonnet for tasks where output quality directly drives revenue decisions.

    How the Planes Connect

    Control Plane (Airflow)
    │
    ├── ExternalTaskSensor ──→ blocks until dbt tests pass
    │
    ├── prepare_features ────→ reads from mart, adds execution_date column
    │
    ├── run_llm_inference ───→ Decision Plane: reads features, calls LLM, stages output
    │
    └── write_results ───────→ writes staged results to final mart table
                                  └── triggers: alerts, emails, dashboard refreshes

    The interfaces between planes are deliberately simple:

  • Control → Data: ExternalTaskSensor watches a DAG completion event
  • Data → Decision: A SQL table with a dbt contract (column names, types, not-null guarantees)
  • Decision → Control: A staging table write + success/failure signal back to Airflow
  • No direct function calls across plane boundaries. No shared state. No implicit coupling. The Data Plane doesn't know the Decision Plane exists.

    What This Architecture Buys You

    ConcernWithout PlanesWith Three Planes
    Debugging"Something broke in the AI thing"Pinpoint: Control, Data, or Decision?
    TestingCan't unit-test LLM callsData Plane fully testable with dbt tests
    Cost spikesUnpredictable API costsdbt tests gate bad data before any API call
    Model swapsRewrite everythingSwap Decision Plane implementation only
    Data qualityLLM masks bad data with plausible outputBad data fails tests before inference runs
    ObservabilityBlack boxAirflow history + dbt docs + LLM output table

    The test that tells you if you've built this correctly: can you swap the LLM provider without touching the Data Plane? Can you add a new dbt model without touching the Control Plane? If yes — you have planes. If not — you have a script.

    Getting Started Without Rebuilding Everything

    If you have an existing pipeline, three concrete steps:

    Step 1 — Identify your Decision Plane boundary. Find every place you call an LLM, ML endpoint, or AI model. These are your Decision Plane. Draw a box around them.

    Step 2 — Formalize your Data Plane. Move all feature computation into dbt models. Add contracts and `not_null` tests to every column your AI consumes. Pre-compute context strings in SQL, not in Python.

    Step 3 — Explicit Control Plane dependencies. Make every AI DAG use ExternalTaskSensor or an equivalent quality gate. No AI pipeline should run on data that hasn't passed dbt tests.

    You can do Step 2 in a single sprint without touching the AI code at all. The payoff: the next time the LLM returns garbage, you'll be able to trace it to a specific dbt model, a specific column, and a specific row — not spend three hours reading Airflow logs.

    Build this system hands-on in the Modern Data Stack project

    Ready to go deeper?

    Explore our full curriculum — hands-on skill toolkits built for production data engineering.

    Press Cmd+K to open