Skip to content

How to Build an LLM Data Pipeline

An LLM data pipeline has five stages: crawl text sources → deduplicate near-identical documents → filter and mask PIItokenize into integer sequences → package to HuggingFace Hub. Each stage outputs Parquet files so the pipeline is restartable at any step.

1

Crawl and extract clean text

import scrapy
import trafilatura

class TextSpider(scrapy.Spider):
    name = "text_spider"
    custom_settings = {
        "DOWNLOAD_DELAY": 1.0,  # rate limiting
        "ROBOTSTXT_OBEY": True}

    def parse(self, response):
        text = trafilatura.extract(
            response.text,
            include_comments=False,
            include_tables=True
        )
        if text:
            yield  {"url": response.url, "text": text}
2

Deduplicate with MinHash/LSH

from datasketch import MinHash, MinHashLSH

lsh = MinHashLSH(threshold=0.8, num_perm=128)
unique_docs = []

for doc in documents:
    m = MinHash(num_perm=128)
    for w in doc["text"].split():
        m.update(w.encode('utf8'))
    if not lsh.query(m):  # no near-duplicate
        lsh.insert(doc["url"], m)
        unique_docs.append(doc)

# Typically removes 30-60% of web crawl
print(ff'Kept {len(unique_docs)}/{len(documents)} docs')
3

Filter quality and mask PII

import spacy

nlp = spacy.load("en_core_web_lg")

def mask_pii(text: str) -> str:
    doc = nlp(text)
    masked = text
    for ent in reversed(doc.ents):
        if ent.label_ in ('PERSON', 'EMAIL', 'PHONE'):
            masked = (
                masked[:ent.start_char]
                + f'[{ent.label_}]'
                + masked[ent.end_char:]
            )
    return masked

clean_docs = [
    {**doc, "text": mask_pii(doc["text"])}
    for doc in unique_docs
    if len(doc["text"]) > 200  # min length filter
] 
4

Tokenize and pack sequences

import tiktoken
import pyarrow as pa
import pyarrow.parquet as pq

enc = tiktoken.get_encoding("cl100k_base")
CTX = 2048

buffer, packed = [], []
for doc in clean_docs:
    tokens = enc.encode(doc["text"]) + [enc.eot_token]
    buffer.extend(tokens)
    while len(buffer) >= CTX:
        packed.append(buffer[:CTX])
        buffer = buffer[CTX:]

# Write packed sequences to Parquet
table = pa.table({"input_ids": packed})
pq.write_table(table, "tokens.parquet", compression="zstd")
5

Package and push to HuggingFace Hub

from datasets import Dataset

# Load from Parquet
ds = Dataset.from_parquet("tokens.parquet")

# Split into train/validation
split = ds.train_test_split(test_size=0.01, seed=42)

# Push to HuggingFace Hub with dataset card
split.push_to_hub(
    repo_id="my-org/domain-corpus-v1",
    private=True  
)

# Verify stats
total_tokens = sum(len(x) for x in split["train"]["input_ids"])
print(ff'Dataset: {total_tokens:,} tokens')

When to Use This Pattern

  • • Building a domain-specific fine-tuning corpus (legal, medical, code)
  • • Preparing pre-training data from web crawl sources
  • • Creating instruction datasets for supervised fine-tuning (SFT)
  • • Processing internal documents into a private training dataset

Common Issues

MinHash is slow on large corpora

Process documents in parallel using Ray actors — each actor maintains its own LSH index shard. Merge shard results with a union-find for cross-shard deduplication. Ray parallelizes MinHash to linear scale.

Sequence packing drops the last incomplete buffer

The trailing buffer (shorter than CTX) is discarded in naive packing. For large corpora this is acceptable. For small fine-tuning datasets, pad with the PAD token and add an attention mask to avoid wasting examples.

HuggingFace Hub push times out on large datasets

Shard the dataset into 500MB files before pushing: ds.save_to_disk("local/") then upload shards individually with the huggingface_hub library. The datasets library handles automatic sharding with max_shard_size parameter.

FAQ

How much data do I need to fine-tune an LLM?
1K–100K high-quality instruction pairs for SFT. Millions to billions of tokens for domain adaptation. Quality beats quantity — 10K clean examples outperform 1M noisy ones.
What file format should I use?
Parquet (via PyArrow) for token arrays — columnar, compressed, fast. JSONL for instruction datasets. HuggingFace Datasets wraps both and handles DataLoader integration.
How do I scale to billions of tokens?
Ray or Dask for distributed processing. Partition input into shards and process in parallel. Store intermediate Parquet shards so the pipeline is restartable from any checkpoint.

Related

Press Cmd+K to open