How to Design a Modern Data + AI System: Control, Data, and Decision Planes
Want to build this yourself?
This architecture is covered in our hands-on projects. Build it in the AI-DE sandbox.
Explore ProjectsThe Problem With "Just Add AI"
Most data teams get their first AI feature working in a weekend: connect a dbt model to an LLM API, write a Python script in a DAG, ship it. Six months later, that "AI feature" is the least reliable thing in your stack. Costs spike unpredictably. Prompts silently drift. Bad data makes it to the model. And when something breaks, no one knows if it's the data, the prompt, or the model.
The teams that avoid this pattern have one thing in common: they think in planes.
Modern Data + AI System — Three-Plane Architecture
Control Plane
Apache Airflow
What runs, when, in what order
data ready→
Data Plane
dbt + Spark
What data looks like before the model
features→
Decision Plane
LLM / ML Inference
Where intelligence is applied
Each plane has one responsibility — failures are isolated, layers are independently testable
A modern data + AI system has three explicit layers:
Each plane has a single responsibility. Each failure mode is isolated. Each layer is independently testable.
The Control Plane: Airflow as the Nervous System
The Control Plane doesn't transform data. It doesn't call models. It does one thing: declare dependencies and schedule execution.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
"owner": "data-platform",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
}
with DAG(
"ai_recommendation_pipeline",
default_args=default_args,
schedule_interval="0 2 * * *", # 2 AM daily
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["ai", "recommendations"],
) as dag:
# Wait for upstream data to be ready
wait_for_dbt = ExternalTaskSensor(
task_id="wait_for_dbt_completion",
external_dag_id="dbt_production",
external_task_id="dbt_run",
timeout=3600,
poke_interval=60,
)
prepare_features = PythonOperator(
task_id="prepare_features",
python_callable=run_feature_prep,
op_kwargs={"execution_date": "{{ ds }}"},
)
run_inference = PythonOperator(
task_id="run_llm_inference",
python_callable=batch_llm_inference,
op_kwargs={"model": "claude-haiku-4-5-20251001", "batch_size": 100},
)
write_results = PythonOperator(
task_id="write_results",
python_callable=write_to_warehouse,
)
wait_for_dbt >> prepare_features >> run_inference >> write_resultsThree things this DAG does correctly:
The Control Plane's job is visibility and reliability, not logic. If your DAG contains business logic, data transformations, or prompt engineering, you've mixed planes.
The Data Plane: dbt as the Foundation
The single most important thing you can do for an AI system is guarantee the quality of its inputs. A model is only as good as what you feed it. The Data Plane's job is to make that guarantee.
The Three-Layer dbt Pattern
-- 1. Staging: rename + cast raw source fields
-- models/staging/stg_user_events.sql
SELECT
id AS event_id,
user_id,
event_type,
CAST(created_at AS TIMESTAMP) AS event_at,
CAST(properties AS JSON) AS properties
FROM {{ source('raw', 'events') }}
WHERE event_type IS NOT NULL-- 2. Intermediate: business logic lives here
-- models/intermediate/int_user_engagement_rolling.sql
SELECT
user_id,
COUNT(*) FILTER (WHERE event_at >= CURRENT_DATE - 7) AS event_count_7d,
COUNT(DISTINCT event_type)
FILTER (WHERE event_at >= CURRENT_DATE - 7) AS distinct_event_types_7d,
DATE_DIFF('day', MAX(event_at), CURRENT_DATE) AS days_since_last_event
FROM {{ ref('stg_user_events') }}
GROUP BY 1-- 3. Mart: the clean, AI-ready feature table
-- models/marts/fct_user_features_for_ai.sql
{{ config(
materialized='table',
contract={"enforced": true}
) }}
SELECT
u.user_id,
u.plan_tier,
u.days_since_signup,
e.event_count_7d,
e.days_since_last_event,
-- Pre-computed context string — inspect this in any SQL client
CONCAT(
'User has been on the ', u.plan_tier, ' plan for ',
u.days_since_signup, ' days. ',
'Last active ', e.days_since_last_event, ' days ago. ',
'Performed ', e.event_count_7d, ' events in the last week.'
) AS user_summary_for_llm
FROM {{ ref('dim_users') }} u
LEFT JOIN {{ ref('int_user_engagement_rolling') }} e USING (user_id)The `user_summary_for_llm` column is the key pattern: pre-compute the context string in SQL, not in Python at inference time. This means token count is predictable per row, the LLM always gets clean consistent context, and you can inspect it in any SQL client.
dbt Contracts as AI Quality Gates
# models/marts/fct_user_features_for_ai.yml
version: 2
models:
- name: fct_user_features_for_ai
contract:
enforced: true
columns:
- name: user_id
data_type: varchar
constraints:
- type: not_null
- type: unique
- name: user_summary_for_llm
data_type: varchar
constraints:
- type: not_null
tests:
- not_null:
column_name: user_summary_for_llm
- dbt_expectations.expect_column_value_lengths_to_be_between:
column_name: user_summary_for_llm
min_value: 50
max_value: 500 # guard against runaway context stringsIf dbt tests fail, the Airflow ExternalTaskSensor blocks inference from running. This is the Data Plane's quality gate — and it requires zero custom code in the AI pipeline itself. Bad data fails loudly before a single API call is made.
The Decision Plane: LLM Inference at Batch Scale
The Decision Plane takes clean, structured input from the Data Plane and produces structured, storable output. It does not transform data. It does not write to production databases directly. It returns results to the Control Plane's write step.
Structured Batch Inference with the Anthropic API
import anthropic
import json
import concurrent.futures
from dataclasses import dataclass
client = anthropic.Anthropic()
@dataclass
class UserRecommendation:
user_id: str
recommendation_type: str
recommendation_text: str
confidence_score: float
reasoning: str
SYSTEM_PROMPT = """You are a data-driven product advisor. Given a user engagement summary,
recommend the single most impactful action for this user.
Respond in JSON with this exact structure:
{
"recommendation_type": "upgrade_prompt|re_engagement|feature_discovery|churn_risk",
"recommendation_text": "specific, actionable text",
"confidence_score": 0.0-1.0,
"reasoning": "one sentence explaining why"
}"""
def get_recommendation(user_id: str, user_summary: str) -> UserRecommendation:
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=256,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_summary}],
)
result = json.loads(message.content[0].text)
return UserRecommendation(
user_id=user_id,
recommendation_type=result["recommendation_type"],
recommendation_text=result["recommendation_text"],
confidence_score=result["confidence_score"],
reasoning=result["reasoning"],
)
def batch_llm_inference(execution_date: str, batch_size: int = 100) -> None:
"""
Airflow callable: reads features from DWH, runs inference, writes back.
Idempotent — safe to retry on partial failure.
"""
rows = read_pending_rows(execution_date, batch_size)
if not rows:
print(f"No rows to process for {execution_date}")
return
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(get_recommendation, row["user_id"], row["user_summary_for_llm"]): row["user_id"]
for row in rows
}
for future in concurrent.futures.as_completed(futures):
try:
results.append(future.result())
except Exception as e:
print(f"Inference failed for user {futures[future]}: {e}")
write_recommendations_to_staging(results, execution_date)Two patterns worth noting:
Idempotent reads: The `read_pending_rows` query excludes users already in the results table for that execution date. Safe to retry on partial failures without re-billing for completed rows.
Structured output: Defining the exact JSON schema in the system prompt + using a typed dataclass on the output means the Decision Plane's output is always structured, always storable, and always auditable.
Cost-Tiered Model Selection
def select_model(task_complexity: str) -> str:
"""Choose the cheapest model that meets the quality bar."""
high_stakes = {"churn_risk", "upgrade_prompt"}
if task_complexity in high_stakes:
return "claude-sonnet-4-6" # more capable, ~4x cost
return "claude-haiku-4-5-20251001" # fast, cheap, sufficient for simple tasksHaiku is 4× cheaper than Sonnet and fast enough for classification and short-form generation. Reserve Sonnet for tasks where output quality directly drives revenue decisions.
How the Planes Connect
Control Plane (Airflow)
│
├── ExternalTaskSensor ──→ blocks until dbt tests pass
│
├── prepare_features ────→ reads from mart, adds execution_date column
│
├── run_llm_inference ───→ Decision Plane: reads features, calls LLM, stages output
│
└── write_results ───────→ writes staged results to final mart table
└── triggers: alerts, emails, dashboard refreshesThe interfaces between planes are deliberately simple:
No direct function calls across plane boundaries. No shared state. No implicit coupling. The Data Plane doesn't know the Decision Plane exists.
What This Architecture Buys You
| Concern | Without Planes | With Three Planes |
|---|---|---|
| Debugging | "Something broke in the AI thing" | Pinpoint: Control, Data, or Decision? |
| Testing | Can't unit-test LLM calls | Data Plane fully testable with dbt tests |
| Cost spikes | Unpredictable API costs | dbt tests gate bad data before any API call |
| Model swaps | Rewrite everything | Swap Decision Plane implementation only |
| Data quality | LLM masks bad data with plausible output | Bad data fails tests before inference runs |
| Observability | Black box | Airflow history + dbt docs + LLM output table |
The test that tells you if you've built this correctly: can you swap the LLM provider without touching the Data Plane? Can you add a new dbt model without touching the Control Plane? If yes — you have planes. If not — you have a script.
Getting Started Without Rebuilding Everything
If you have an existing pipeline, three concrete steps:
Step 1 — Identify your Decision Plane boundary. Find every place you call an LLM, ML endpoint, or AI model. These are your Decision Plane. Draw a box around them.
Step 2 — Formalize your Data Plane. Move all feature computation into dbt models. Add contracts and `not_null` tests to every column your AI consumes. Pre-compute context strings in SQL, not in Python.
Step 3 — Explicit Control Plane dependencies. Make every AI DAG use ExternalTaskSensor or an equivalent quality gate. No AI pipeline should run on data that hasn't passed dbt tests.
You can do Step 2 in a single sprint without touching the AI code at all. The payoff: the next time the LLM returns garbage, you'll be able to trace it to a specific dbt model, a specific column, and a specific row — not spend three hours reading Airflow logs.
→ Build this system hands-on in the Modern Data Stack project
Ready to go deeper?
Explore our full curriculum — hands-on skill toolkits built for production data engineering.