How to Ingest Data from an API: Step-by-Step Guide
Build a production API ingestion pipeline in 6 steps: read the docs → authenticate with token refresh → implement cursor pagination → add exponential backoff for rate limits → validate schemas with Pydantic → write idempotently and track watermarks. Each step addresses a specific production failure mode. Skip one and that failure mode will eventually hit you in production.
Read the API documentation
Before writing code, read the docs. Understand: authentication method (API key vs OAuth 2.0), pagination strategy (cursor vs offset), rate limits (requests/minute, daily quota), available filters (since, updated_after), and whether webhooks are available.
Implement authentication
Build an authenticated HTTP client. API key goes in the Authorization header. OAuth 2.0 requires a token refresh loop — access tokens typically expire in 60 minutes.
import httpx
from datetime import datetime, timedelta
class APIClient:
def __init__(self, client_id: str, client_secret: str):
self._client_id = client_id
self._client_secret = client_secret
self._token: str | None = None
self._expires_at: datetime | None = None
def _ensure_token(self):
if not self._token or datetime.utcnow() >= self._expires_at:
r = httpx.post(
"https://api.example.com/oauth/token",
data={"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret},
)
r.raise_for_status()
body = r.json()
self._token = body["access_token"]
# Subtract 60s buffer before actual expiry
self._expires_at = datetime.utcnow() + timedelta(
seconds=body["expires_in"] - 60
)
def get(self, path: str, **params) -> dict:
self._ensure_token()
r = httpx.get(
f"https://api.example.com{path}",
params=params,
headers={"Authorization": f"Bearer {self._token}"},
)
r.raise_for_status()
return r.json()
Implement cursor pagination
Fetch pages in a loop, passing the cursor returned by each response as the parameter for the next. Stop when the API returns no cursor (end of results).
from typing import Generator
def paginate(client: APIClient, path: str, since: str) -> Generator[list, None, None]:
cursor = None
while True:
params = {"since": since, "limit": 100}
if cursor:
params["cursor"] = cursor
response = client.get(path, **params)
records = response["data"]
if not records:
break
yield records
cursor = response.get("next_cursor")
if not cursor:
break # no more pages
Add rate limit handling with backoff
Wrap every API call in a retry loop. On 429 (rate limit), wait with exponential backoff and random jitter before retrying. Respect the Retry-After header if the API provides it.
import time, random, httpx
def get_with_backoff(client: APIClient, path: str, **params) -> dict:
max_retries = 5
for attempt in range(max_retries):
try:
return client.get(path, **params)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = e.response.headers.get("Retry-After")
if retry_after:
wait = float(retry_after)
else:
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt+1})")
time.sleep(wait)
else:
raise
raise RuntimeError(f"Max retries exceeded for {path}")
Validate response schemas with Pydantic
Define a Pydantic model for the expected response shape. Parse every record through it. On validation failure, route to a dead-letter location rather than crashing the full run.
from pydantic import BaseModel, ValidationError
import json
class OrderRecord(BaseModel):
order_id: str
customer_id: str
amount_usd: float
status: str
created_at: str
updated_at: str
def parse_records(raw_records: list[dict]) -> tuple[list[OrderRecord], list[dict]]:
valid, dead_letter = [], []
for record in raw_records:
try:
valid.append(OrderRecord(**record))
except ValidationError as e:
# Route bad records to dead-letter rather than crashing
dead_letter.append({"raw": record, "error": str(e)})
return valid, dead_letter
Write idempotently and track watermarks
UPSERT records on primary key so replays are safe. After each successful run, save the max updated_at as the watermark for the next incremental run.
# UPSERT to Snowflake (merge on primary key)
UPSERT_SQL = """
MERGE INTO orders AS target
USING (SELECT * FROM staging.orders_staging) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
amount_usd = source.amount_usd,
status = source.status,
updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, amount_usd, status, updated_at)
VALUES (source.order_id, source.customer_id, source.amount_usd,
source.status, source.updated_at);
"""
# Save watermark for next incremental run
def update_watermark(conn, max_updated_at: str):
conn.execute(
"INSERT OR REPLACE INTO ingestion_state (source, watermark) VALUES (?, ?)",
("orders_api", max_updated_at)
)
Common Issues
Pipeline stops mid-run on token expiry
Build token refresh into the client, not the pipeline. The client checks expiry before every request. A 60-second safety margin before the actual expiry prevents edge cases during slow network requests.
Missing records when using offset pagination
Offset pagination is unstable when records are inserted during a paginated scan — records shift positions and some get skipped. Switch to cursor or keyset pagination, or lock the result set with a consistent snapshot timestamp.
Duplicate rows after pipeline retry
Network failures cause pipelines to replay. If your write strategy is INSERT rather than UPSERT, every replay creates duplicates. Always use UPSERT (MERGE) on a stable primary key.
FAQ
- What is the best Python library for API ingestion?
- httpx for custom pipelines — async support, HTTP/2, connection pooling. Airbyte or Singer taps when a connector already exists for your source — they handle state, pagination, and auth automatically.
- How do you handle pagination in Python?
- Implement as a generator yielding pages until next_cursor is None. For offset pagination use keyset (WHERE id > last_seen_id) rather than LIMIT/OFFSET to stay stable under concurrent inserts.
- How do you store watermarks for incremental ingestion?
- A dedicated metadata table in your warehouse (source, watermark, updated_at) or a key-value store like Redis. Read on pipeline start, update on successful completion with max(updated_at) from the new batch.