Skip to content
Back to LLM Ingestion Pipeline

Orchestration is Ray + Airflow, not Spark / Dask / single-node cron

✓ AcceptedLLM Ingestion Pipeline05 — Production LLMOps (sub-part: pipeline orchestration)
By AI-DE Engineering Team·Stakeholders: platform engineer, data engineer, on-call

Context

The pipeline has two distinct orchestration concerns:

  1. DAG-shape scheduling — daily eval runs, weekly ingest re-crawls, model retrain triggers. Needs cron / interval scheduling, retries, alerting, dependencies between tasks.
  2. Fan-out compute — embedding generation across 100K+ docs, distributed dedup at scale, parallel inference for batch eval. Needs distributed task execution with GPU placement and shared state.

The classic options:

  1. Spark / pyspark — distributed compute on JVM. Strong for batch data processing. Heavy footprint; JVM dependency in a Python pipeline.
  2. Dask — Python-native distributed compute. Lighter than Spark. Strong DataFrame API but weaker GPU story.
  3. Ray — Python-native distributed compute with first-class GPU support and @ray.remote(num_gpus=N) placement.
  4. single-node cron — run everything on one beefy machine. Simplest but doesn't scale past ~10K docs/day.

For DAG scheduling separately:

  1. Airflow — the de-facto industry default. DAGs as Python.
  2. Prefect — lighter, modern. Smaller ecosystem.
  3. Dagster — asset-first model. Different mental model.

Decision

Adopt Ray for fan-out compute + Airflow for DAG scheduling, with the boundary at "Airflow operators call Ray remote functions".

# dags/ingest_dag.py — Airflow DAG
from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta

with DAG(
    dag_id="weekly_ingest",
    schedule_interval="@weekly",
    start_date=datetime(2026, 5, 1),
    default_args={"retries": 3, "retry_delay": timedelta(minutes=15)},
) as dag:
    @task
    def crawl_seeds(): ...

    @task
    def dedup_corpus(crawl_path): ...

    @task
    def embed_docs(dedup_path):
        # Ray fan-out for the embedding step
        import ray
        @ray.remote(num_gpus=0.5)
        def embed_chunk(chunk): ...
        results = ray.get([embed_chunk.remote(c) for c in chunks])
        return results

    crawl_seeds() >> dedup_corpus() >> embed_docs()
# dags/eval_dag.py — daily regression eval
@task
def run_eval_suite(eval_dataset_path):
    import ray
    @ray.remote
    def judge_question(q, context): ...
    scores = ray.get([judge_question.remote(q, c) for q, c in pairs])
    return aggregate(scores)

Tradeoffs we accept

LeverSparkDaskRay (chosen)Single-node cron
Python-nativeNo (JVM)YesYesYes
GPU placementManual (Spark RAPIDS)AwkwardFirst-class (num_gpus=N)None
DataFrame APIStrongStrongWeaker (Modin/Datasets)pandas
Local-dev parityJVM containerPip installPip installNative
Operational complexitySpark cluster opsDask scheduler opsRay head + workersNone
Fault toleranceNative (RDD lineage)Task retriesTask retries + actor restartsNone
Memory modelOff-heap JVMDistributed PandasObject storeProcess memory
Cost (small cluster, 100K docs/day)$400+/mo (cluster)$200/mo$200/mo$50/mo

For DAG scheduling:

LeverAirflow (chosen)PrefectDagster
Industry adoptionHighestGrowingGrowing
Operator ecosystemLargestMediumMedium
Hosted optionsMWAA, Astronomer, ComposerCloudCloud
Mental modelDAGs of tasksFlow state machinesAsset-centric
Tutorial reproducibilityLocal docker-compose worksEasyEasy
Backfill storyFirst-classFirst-classFirst-class

We optimize for GPU-first fan-out (Ray) + industry-default scheduler (Airflow). The pipeline's bottleneck is GPU embedding generation at the 100K-doc step; Ray's num_gpus=0.5 placement (you can fractionally claim a GPU) is the deciding capability. Spark would work but the JVM dep and operational overhead aren't worth it for a predominantly Python pipeline.

Spark IS used in one optional Tier-2 path (distributed_dedup.py) because pyspark's DataFrame API is genuinely the cleanest for the specific case of distributed MinHash sharding. That's deliberate — ADR-002 documents the dedup decision; this ADR-004 documents the overall orchestration decision.

Consequences (positive)

  • Module 05 ships runnable Airflow DAGs that start with airflow standalone on a laptop. The dags/ingest_dag.py + dags/eval_dag.py files are the production shape — no toy code.
  • Ray's @ray.remote(num_gpus=0.5) lets a single g5.xlarge run two embedding jobs in parallel without GPU contention.
  • The Airflow + Ray boundary is clean: Airflow knows about retries and scheduling, Ray knows about GPU placement and fan-out. Neither needs to know about the other's concerns.
  • The CI gate (.github/workflows/eval.yml) calls the same run_eval_suite function the daily DAG calls — no parallel implementation.

Consequences (negative)

  • Two systems to operate. Airflow scheduler + Ray head node + Ray workers. ~$48/mo on EC2 for the Airflow scheduler vs $0 for cron. Mitigation: cost-model CSV documents this. Below 10K docs/day, single-node cron is the right answer; this ADR is for the post- scaling regime.
  • Ray learning curve. Async actor model is unfamiliar to teams used to plain function calls. Mitigation: Module 05 walks through @ray.remote step by step; Tier-1 path uses synchronous stubs.
  • Airflow cluster setup is non-trivial. Production Airflow needs a metadata DB, a scheduler, workers, a webserver. Mitigation: the tutorial uses airflow standalone (single binary, SQLite metadata) for local; Tier-2 documents MWAA/Astronomer for production.
  • No native data-quality gate primitives. Great Expectations or Soda would slot in here but aren't part of the orchestration ADR. Future ADR could add them.

Reversal plan

The orchestration interface is "task functions called by a scheduler". Replacement is bounded:

  1. Spark for the entire compute layer — replace Ray's @ray.remote with Spark UDFs. Forces a JVM dep and SparkSession bootstrapping. ~2 engineer-weeks. Documented but not recommended.
  2. Dask — replace @ray.remote with dask.delayed. ~3 engineer- days. Loses GPU placement; gains DataFrame API. Net negative for this project.
  3. Prefect for the scheduler — replace Airflow DAGs with Prefect flows. ~1 engineer-week. Lighter footprint; smaller ecosystem.
  4. Argo Workflows — k8s-native; replace Airflow if the team's already on k8s. ~1-2 engineer-weeks.

Estimated effort: 3 engineer-days (Dask) to 2 engineer-weeks (full Spark migration). All reversible.

References

  • dags/ingest_dag.py (weekly ingest DAG with Ray fan-out for embed)
  • dags/eval_dag.py (daily eval DAG with Ray fan-out for judge calls)
  • pipeline/cost_tracker.py (per-task cost tracking the DAG calls)
  • app/metrics.py (Prometheus metrics emitted by Airflow + Ray)
  • .github/workflows/eval.yml (CI gate that calls the same eval function)
  • ADR-002 (Spark IS used optionally in distributed_dedup.py — documented exception)
Built into the project

This decision shipped as part of LLM Ingestion Pipeline — see the full architecture, starter kit, and 4 more ADRs.

Open project →
Press Cmd+K to open