Skip to content
Back to Agentic Data Pipeline

Hierarchical supervisor-worker topology, not peer-to-peer agents

✓ AcceptedAgentic Data Pipeline03 — Multi-Agent Pipeline
By AI-DE Engineering Team·Stakeholders: ML engineer, platform owner

Context

Multi-agent systems can be organized along a continuum:

  • Hierarchical (supervisor-worker). One agent (supervisor) delegates to specialized workers; workers report back; supervisor decides next step. Deterministic routing. Easy to reason about.
  • Peer-to-peer (mesh). All agents see all messages; any agent can decide to act; coordination is emergent. More flexible. Harder to debug.
  • Pipeline (chain). Fixed sequence A → B → C → D; no decisions, just transformation. Simplest. Doesn't handle the "should we route this through validation?" cases this project needs.

The pipeline option fails our requirements (the M02 quality agent explicitly needs to make routing decisions). The choice is hierarchical vs peer-to-peer.

Constraints:

  1. Auditability. Module 04 ships LangSmith tracing + cost attribution per agent. Peer-to-peer makes "who decided this?" expensive — the answer is "all of them, kind of". Hierarchical makes attribution trivial: the supervisor decided.
  2. Replayability. Module 05's TimeTravel needs deterministic replay of the same input → same path. Peer-to-peer mesh has emergent ordering — replay is best-effort, not exact.
  3. Cost predictability. Peer-to-peer LLM calls scale super-linearly with worker count (every worker may call the LLM on every message). Supervisor-worker scales linearly.
  4. Debugging surface. Staff engineers reading the trace need to follow a tree, not a graph.

Decision

Adopt supervisor-worker. One supervisor node + N worker nodes (4 in v1: Ingestion, Quality, Transform, Loading). Workers are stateless given input + state; supervisor holds the routing decision logic.

# src/agents/supervisor.py
class SupervisorAgent:
    """Routes between workers based on state + tool availability + budget."""

    async def __call__(self, state: PipelineState) -> PipelineState:
        decision = await self.llm.with_structured_output(RoutingDecision).ainvoke([
            {"role": "system", "content": SUPERVISOR_SYSTEM_PROMPT},
            {"role": "user", "content": format_state(state)},
        ])
        return {**state, "next": decision.next_worker, "confidence": decision.confidence}
# src/agents/workers.py
class IngestionWorker:
    async def __call__(self, state: PipelineState) -> PipelineState:
        result = await self.tool_registry.call("query_database", state["query"])
        return {**state, "ingested": result}

LangGraph conditional edges encode the routing:

graph.add_conditional_edges("supervisor", lambda s: s["next"], {
    "ingest": "ingestion", "validate": "quality", "transform": "transform",
    "load": "loading", "end": END,
})

The supervisor is the only node that decides — workers do, supervisor decides.

Tradeoffs we accept

LeverAlternativeChosen
Routing decision qualityPeer voting on every stepSingle-supervisor decision per step — accept supervisor as the bottleneck
ConcurrencyAll workers running in parallelOne worker active at a time (per run) — accept latency, gain auditability
Worker autonomyWorkers can spawn sub-tasksWorkers can only return → supervisor decides next
Coordination overheadWorker-to-worker message passingAll worker output flows through state via supervisor

Consequences (positive)

  • M04's per-agent cost attribution is trivial — sum tokens per node id from LangSmith traces.
  • M05's TimeTravel replay is exact: same input + same supervisor state → same routing decision.
  • Supervisor system prompt is the single point of routing logic. Updates ship without touching workers.
  • New worker types are additive — add the node, add the routing branch in the conditional edge map. No mesh-rewiring.
  • Debugging is tree traversal: trace shows supervisor → worker → supervisor → worker pattern.

Consequences (negative)

  • Supervisor is on the critical path of every run. A bad supervisor LLM call delays everything. Mitigation: M05's ToolCallGuard caps supervisor invocations per run.
  • Concurrency is per-run, not per-step. Workers don't run in parallel within a single run. Mitigation: M04's task queue parallelises across runs.
  • Routing decisions are LLM calls — non-deterministic. M05's failure detection treats supervisor non-determinism as expected; we don't try to make it perfectly deterministic.

Reversal plan

Peer-to-peer mesh: ~2 engineer-weeks if requirements change. Triggers:

  1. Workers genuinely need to coordinate without supervisor mediation (e.g. real-time streaming use case where supervisor latency dominates).
  2. Run-level concurrency stops being enough — we need within-run parallelism.

Implementation: replace conditional edges with broadcast pub/sub on Redis. Workers subscribe to relevant state slices, decide locally whether to act. Significantly more complex; only justified by a real requirement, not theoretical flexibility.

Hybrid (supervisor + parallel sub-team): ~1 engineer-week. Supervisor delegates to a sub-team (e.g. Ingestion + Quality run in parallel before merging). LangGraph supports this via parallel branches; we just don't need it at v1 load.

References

  • src/agents/supervisor.py — SupervisorAgent implementation
  • src/agents/workers.py — 4 worker implementations
  • src/orchestration/graph.py — conditional edge wiring
  • src/observability/cost_tracking.py — per-agent attribution depends on this topology
  • ADR-001 (LangGraph supports both topologies; this picks one)
  • ADR-002 (Redis checkpointing is per-supervisor-decision granularity)
Built into the project

This decision shipped as part of Agentic Data Pipeline — see the full architecture, starter kit, and 4 more ADRs.

Open project →
Press Cmd+K to open