How to Build an LLM Data Pipeline
An LLM data pipeline has five stages: crawl text sources → deduplicate near-identical documents → filter and mask PII → tokenize into integer sequences → package to HuggingFace Hub. Each stage outputs Parquet files so the pipeline is restartable at any step.
Crawl and extract clean text
import scrapy
import trafilatura
class TextSpider(scrapy.Spider):
name = "text_spider"
custom_settings = {
"DOWNLOAD_DELAY": 1.0, # rate limiting
"ROBOTSTXT_OBEY": True}
def parse(self, response):
text = trafilatura.extract(
response.text,
include_comments=False,
include_tables=True
)
if text:
yield {"url": response.url, "text": text}
Deduplicate with MinHash/LSH
from datasketch import MinHash, MinHashLSH
lsh = MinHashLSH(threshold=0.8, num_perm=128)
unique_docs = []
for doc in documents:
m = MinHash(num_perm=128)
for w in doc["text"].split():
m.update(w.encode('utf8'))
if not lsh.query(m): # no near-duplicate
lsh.insert(doc["url"], m)
unique_docs.append(doc)
# Typically removes 30-60% of web crawl
print(ff'Kept {len(unique_docs)}/{len(documents)} docs')
Filter quality and mask PII
import spacy
nlp = spacy.load("en_core_web_lg")
def mask_pii(text: str) -> str:
doc = nlp(text)
masked = text
for ent in reversed(doc.ents):
if ent.label_ in ('PERSON', 'EMAIL', 'PHONE'):
masked = (
masked[:ent.start_char]
+ f'[{ent.label_}]'
+ masked[ent.end_char:]
)
return masked
clean_docs = [
{**doc, "text": mask_pii(doc["text"])}
for doc in unique_docs
if len(doc["text"]) > 200 # min length filter
]
Tokenize and pack sequences
import tiktoken
import pyarrow as pa
import pyarrow.parquet as pq
enc = tiktoken.get_encoding("cl100k_base")
CTX = 2048
buffer, packed = [], []
for doc in clean_docs:
tokens = enc.encode(doc["text"]) + [enc.eot_token]
buffer.extend(tokens)
while len(buffer) >= CTX:
packed.append(buffer[:CTX])
buffer = buffer[CTX:]
# Write packed sequences to Parquet
table = pa.table({"input_ids": packed})
pq.write_table(table, "tokens.parquet", compression="zstd")
Package and push to HuggingFace Hub
from datasets import Dataset
# Load from Parquet
ds = Dataset.from_parquet("tokens.parquet")
# Split into train/validation
split = ds.train_test_split(test_size=0.01, seed=42)
# Push to HuggingFace Hub with dataset card
split.push_to_hub(
repo_id="my-org/domain-corpus-v1",
private=True
)
# Verify stats
total_tokens = sum(len(x) for x in split["train"]["input_ids"])
print(ff'Dataset: {total_tokens:,} tokens')
When to Use This Pattern
- • Building a domain-specific fine-tuning corpus (legal, medical, code)
- • Preparing pre-training data from web crawl sources
- • Creating instruction datasets for supervised fine-tuning (SFT)
- • Processing internal documents into a private training dataset
Common Issues
MinHash is slow on large corpora
Process documents in parallel using Ray actors — each actor maintains its own LSH index shard. Merge shard results with a union-find for cross-shard deduplication. Ray parallelizes MinHash to linear scale.
Sequence packing drops the last incomplete buffer
The trailing buffer (shorter than CTX) is discarded in naive packing. For large corpora this is acceptable. For small fine-tuning datasets, pad with the PAD token and add an attention mask to avoid wasting examples.
HuggingFace Hub push times out on large datasets
Shard the dataset into 500MB files before pushing: ds.save_to_disk("local/") then upload shards individually with the huggingface_hub library. The datasets library handles automatic sharding with max_shard_size parameter.
FAQ
- How much data do I need to fine-tune an LLM?
- 1K–100K high-quality instruction pairs for SFT. Millions to billions of tokens for domain adaptation. Quality beats quantity — 10K clean examples outperform 1M noisy ones.
- What file format should I use?
- Parquet (via PyArrow) for token arrays — columnar, compressed, fast. JSONL for instruction datasets. HuggingFace Datasets wraps both and handles DataLoader integration.
- How do I scale to billions of tokens?
- Ray or Dask for distributed processing. Partition input into shards and process in parallel. Store intermediate Parquet shards so the pipeline is restartable from any checkpoint.