Skip to content

How to Ingest Data from an API: Step-by-Step Guide

Build a production API ingestion pipeline in 6 steps: read the docs → authenticate with token refresh → implement cursor pagination → add exponential backoff for rate limits → validate schemas with Pydantic → write idempotently and track watermarks. Each step addresses a specific production failure mode. Skip one and that failure mode will eventually hit you in production.

1

Read the API documentation

Before writing code, read the docs. Understand: authentication method (API key vs OAuth 2.0), pagination strategy (cursor vs offset), rate limits (requests/minute, daily quota), available filters (since, updated_after), and whether webhooks are available.

2

Implement authentication

Build an authenticated HTTP client. API key goes in the Authorization header. OAuth 2.0 requires a token refresh loop — access tokens typically expire in 60 minutes.

import httpx
from datetime import datetime, timedelta

class APIClient:
    def __init__(self, client_id: str, client_secret: str):
        self._client_id = client_id
        self._client_secret = client_secret
        self._token: str | None = None
        self._expires_at: datetime | None = None

    def _ensure_token(self):
        if not self._token or datetime.utcnow() >= self._expires_at:
            r = httpx.post(
                "https://api.example.com/oauth/token",
                data={"grant_type": "client_credentials",
                      "client_id": self._client_id,
                      "client_secret": self._client_secret},
            )
            r.raise_for_status()
            body = r.json()
            self._token = body["access_token"]
            # Subtract 60s buffer before actual expiry
            self._expires_at = datetime.utcnow() + timedelta(
                seconds=body["expires_in"] - 60
            )

    def get(self, path: str, **params) -> dict:
        self._ensure_token()
        r = httpx.get(
            f"https://api.example.com{path}",
            params=params,
            headers={"Authorization": f"Bearer {self._token}"},
        )
        r.raise_for_status()
        return r.json()
3

Implement cursor pagination

Fetch pages in a loop, passing the cursor returned by each response as the parameter for the next. Stop when the API returns no cursor (end of results).

from typing import Generator

def paginate(client: APIClient, path: str, since: str) -> Generator[list, None, None]:
    cursor = None
    while True:
        params = {"since": since, "limit": 100}
        if cursor:
            params["cursor"] = cursor

        response = client.get(path, **params)
        records = response["data"]

        if not records:
            break

        yield records

        cursor = response.get("next_cursor")
        if not cursor:
            break   # no more pages
4

Add rate limit handling with backoff

Wrap every API call in a retry loop. On 429 (rate limit), wait with exponential backoff and random jitter before retrying. Respect the Retry-After header if the API provides it.

import time, random, httpx

def get_with_backoff(client: APIClient, path: str, **params) -> dict:
    max_retries = 5
    for attempt in range(max_retries):
        try:
            return client.get(path, **params)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = e.response.headers.get("Retry-After")
                if retry_after:
                    wait = float(retry_after)
                else:
                    wait = (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt+1})")
                time.sleep(wait)
            else:
                raise
    raise RuntimeError(f"Max retries exceeded for {path}")
5

Validate response schemas with Pydantic

Define a Pydantic model for the expected response shape. Parse every record through it. On validation failure, route to a dead-letter location rather than crashing the full run.

from pydantic import BaseModel, ValidationError
import json

class OrderRecord(BaseModel):
    order_id: str
    customer_id: str
    amount_usd: float
    status: str
    created_at: str
    updated_at: str

def parse_records(raw_records: list[dict]) -> tuple[list[OrderRecord], list[dict]]:
    valid, dead_letter = [], []
    for record in raw_records:
        try:
            valid.append(OrderRecord(**record))
        except ValidationError as e:
            # Route bad records to dead-letter rather than crashing
            dead_letter.append({"raw": record, "error": str(e)})
    return valid, dead_letter
6

Write idempotently and track watermarks

UPSERT records on primary key so replays are safe. After each successful run, save the max updated_at as the watermark for the next incremental run.

# UPSERT to Snowflake (merge on primary key)
UPSERT_SQL = """
MERGE INTO orders AS target
USING (SELECT * FROM staging.orders_staging) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
    amount_usd = source.amount_usd,
    status = source.status,
    updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, amount_usd, status, updated_at)
    VALUES (source.order_id, source.customer_id, source.amount_usd,
            source.status, source.updated_at);
"""

# Save watermark for next incremental run
def update_watermark(conn, max_updated_at: str):
    conn.execute(
        "INSERT OR REPLACE INTO ingestion_state (source, watermark) VALUES (?, ?)",
        ("orders_api", max_updated_at)
    )

Common Issues

Pipeline stops mid-run on token expiry

Build token refresh into the client, not the pipeline. The client checks expiry before every request. A 60-second safety margin before the actual expiry prevents edge cases during slow network requests.

Missing records when using offset pagination

Offset pagination is unstable when records are inserted during a paginated scan — records shift positions and some get skipped. Switch to cursor or keyset pagination, or lock the result set with a consistent snapshot timestamp.

Duplicate rows after pipeline retry

Network failures cause pipelines to replay. If your write strategy is INSERT rather than UPSERT, every replay creates duplicates. Always use UPSERT (MERGE) on a stable primary key.

FAQ

What is the best Python library for API ingestion?
httpx for custom pipelines — async support, HTTP/2, connection pooling. Airbyte or Singer taps when a connector already exists for your source — they handle state, pagination, and auth automatically.
How do you handle pagination in Python?
Implement as a generator yielding pages until next_cursor is None. For offset pagination use keyset (WHERE id > last_seen_id) rather than LIMIT/OFFSET to stay stable under concurrent inserts.
How do you store watermarks for incremental ingestion?
A dedicated metadata table in your warehouse (source, watermark, updated_at) or a key-value store like Redis. Read on pipeline start, update on successful completion with max(updated_at) from the new batch.

Related

Press Cmd+K to open