Data Pipelines¶
Learning outcomes
- Understand the key components of data pipelines for LLMs
- Run a part of post-training data preparation pipeline
Quiz yourself!
Answer key
1:B, 2:B, 3:B
-
What is the primary goal of pre-training?
A. Train small task-specific classifiers
B. Assemble large, diverse, governed corpora and feed tokens efficiently so the model learns general-purpose representations
C. Compress datasets for long-term archival
D. Only fine-tune models on downstream tasks -
Which file format is recommended for fast local reads and training-ready batches with Hugging Face Datasets?
A. JSON/JSONL
B. Apache Arrow
C. CSV
D. XML -
According to the post-training section, aligning base LLMs to tasks is primarily done via:
A. Data sharding and compression
B. Supervised fine-tuning and preference optimization (reward modelling or RLHF/RLAIF)
C. Only unsupervised pre-training
D. Manual rule-based post-processing

-
Well-designed data pipelines determine LLM quality, safety, and training throughput.
-
Success of all frontier models like GPT-5 relies heavily on quality data and constructing effecient pipelines around it that reduces training compute and improves the capabilities of the model that we desire it to have.
SmolLM3 training team
From our experience, and though it might disappoint architecture enthusiasts, the biggest performance gains usually come from data curation.
Pre-training 📊¶
Goal: Assemble large, diverse, governed corpora and feed tokens efficiently to the model to learn general-purpose representations. LLM learns in self-supervised fashion.
Raw data, is often messy and unsuitable for learning linguistic semantics. It typically exists in diverse formats like HTML, PDFs, spreadsheets etc, requiring extensive preprocessing to make it usable for training. Challenge lies in preserving the content and structure during this lossy process of data cleaning.
-
Data acquisition and licensing (1)
-
Content extraction, normalization and detection (2)
-
Deduplication (3)
-
Quality filtering, decontamination (4)
-
Tokenization and training (5)
-
Mixture building and sampling (6)
-
Storage and sharding (7)
-
Training and observability (8)
-
Continuous Evaluation (9)
-
Data governance and ethics (10)
- web crawl, books, Github code, academic papers. PII and copyright governance.
Web crawl : CC - language ID, Unicode cleanup, boilerplate removal, doc boundaries.
Language classifiers: GlotLID, Fasttext - exact and near-dup (MinHash/SimHash); mitigate contamination and overfitting.
- heuristic/classifier filters (toxicity, spam), quality scoring, temperature sampling.
- train vocab (BPE/Unigram), pre-tokenize, pack sequences respecting EOD. Tokenization: HF Tokenizers,
- domain/language balance, curriculum, up/down-sampling.
- Arrow/Parquet/WebDataset, deterministic sharding, resume-safe streaming. Arrow: HF Datasets
- Remove overlaps with evals; versioning, lineage, dashboards. Checkpoint/logging: HF Trainer
- benchmark language understanding, reasoning, QA etc. Bias, stereotype, toxicity and answer safety checks.
- Ethical charter, inspection tools for data composition, licensing, artifact release for reproducibility and further research.
Full reproduction of the FineWeb dataset
"""
This file contains the code used to process and create the
FineWeb dataset (https://huggingface.co/datasets/HuggingFaceFW/fineweb)
"""
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashDedupFilter, MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import MinhashConfig, MinhashDedupBuckets
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
C4QualityFilter,
FineWebQualityFilter,
GopherQualityFilter,
GopherRepetitionFilter,
LanguageFilter,
URLFilter,
)
from datatrove.pipeline.formatters import PIIFormatter
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.tokens import TokensCounter
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.hashing import HashConfig
"""
we first ran the following pipeline for each dump
"""
DUMP_TO_PROCESS = "CC-MAIN-2023-50" # example
MAIN_OUTPUT_PATH = "s3://some_s3_bucket"
FILTERING_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/base_processing"
main_processing_executor = SlurmPipelineExecutor(
job_name=f"cc_{DUMP_TO_PROCESS}",
pipeline=[
WarcReader(
f"s3://commoncrawl/crawl-data/{DUMP_TO_PROCESS}/segments/",
glob_pattern="*/warc/*", # we want the warc files
default_metadata={"dump": DUMP_TO_PROCESS},
),
URLFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/1_url/{DUMP_TO_PROCESS}")),
Trafilatura(favour_precision=True),
LanguageFilter(
exclusion_writer=JsonlWriter(
f"{FILTERING_OUTPUT_PATH}/2_non_english/",
output_filename="${language}/" + DUMP_TO_PROCESS + "/${rank}.jsonl.gz",
# folder structure: language/dump/file
)
),
GopherRepetitionFilter(
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/3_gopher_rep/{DUMP_TO_PROCESS}")
),
GopherQualityFilter(
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/4_gopher_qual/{DUMP_TO_PROCESS}")
),
C4QualityFilter(
filter_no_terminal_punct=False,
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/5_c4/{DUMP_TO_PROCESS}"),
),
FineWebQualityFilter(
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/6_fineweb_qual/{DUMP_TO_PROCESS}")
),
JsonlWriter(f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"),
],
tasks=8000,
time="10:00:00",
logging_dir=f"{MAIN_OUTPUT_PATH}/logs/base_processing/{DUMP_TO_PROCESS}",
slurm_logs_folder=f"logs/base_processing/{DUMP_TO_PROCESS}/slurm_logs", # must be local
randomize_start_duration=180, # don't hit the bucket all at once with the list requests
mem_per_cpu_gb=2,
partition="hopper-cpu",
)
main_processing_executor.run()
"""
we then applied minhash deduplication to each individual dump,
"""
# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(
hash_config=HashConfig(
hash_fc="sha1", # better precision -> fewer false positives (collisions)
precision=64,
),
num_buckets=14,
hashes_per_bucket=8,
n_grams=5,
)
S3_MINHASH_BASE_PATH = f"{MAIN_OUTPUT_PATH}/minhash"
S3_LOGS_FOLDER = f"{MAIN_OUTPUT_PATH}/logs/minhash"
LOCAL_LOGS_FOLDER = "logs/minhash"
TOTAL_TASKS = 1000
# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(
f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"
) # this is the output from the first part
# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = SlurmPipelineExecutor(
job_name=f"mh1_{DUMP_TO_PROCESS}",
pipeline=[
INPUT_READER,
MinhashDedupSignature(
output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures", config=minhash_config
),
],
tasks=TOTAL_TASKS,
time="5:00:00",
partition="hopper-cpu",
logging_dir=f"{S3_LOGS_FOLDER}/signatures",
slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/signatures/slurm_logs",
randomize_start_duration=180,
depends=main_processing_executor, # only start after the first one completes
)
stage2 = SlurmPipelineExecutor(
job_name=f"mh2_{DUMP_TO_PROCESS}",
pipeline=[
MinhashDedupBuckets(
input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures",
output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets",
config=MinhashConfig(hash_config=minhash_config.hash_config),
),
],
tasks=minhash_config.num_buckets * 50, # the code supports parallelizing each bucket. here we run 50
# workers per bucket
randomize_start_duration=180,
logging_dir=f"{S3_LOGS_FOLDER}/buckets",
partition="hopper-cpu",
time="02:00:00",
mem_per_cpu_gb=4,
cpus_per_task=3, # you can add run more (smaller) tasks if you do not have a lot of memory
depends=stage1,
)
stage3 = SlurmPipelineExecutor(
job_name=f"mh3_{DUMP_TO_PROCESS}",
pipeline=[
MinhashDedupCluster(
input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets",
output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids",
config=minhash_config,
),
],
tasks=1, # this step runs on a single task
logging_dir=f"{S3_LOGS_FOLDER}/clustering",
partition="hopper-cpu",
time="30:00:00", # and can also be quite slow. Usually not this slow though
mem_per_cpu_gb=25,
cpus_per_task=8, # if you dedup a full dump, you do need a lot of memory for this one
depends=stage2,
)
stage4 = SlurmPipelineExecutor(
job_name=f"mh4_{DUMP_TO_PROCESS}",
pipeline=[
INPUT_READER,
TokensCounter(), # you can remove this one, it's just a nice way to know how many tokens we have
# before and after dedup
MinhashDedupFilter(input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids"),
# run the PII removal
PIIFormatter(),
JsonlWriter(f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/deduped_output"),
],
tasks=TOTAL_TASKS,
logging_dir=f"{S3_LOGS_FOLDER}/filtering",
partition="hopper-cpu",
time="5:00:00",
mem_per_cpu_gb=4,
depends=stage3,
)
# launch dedup pipelines
stage4.run()
Resources 📚
-
LLM papers on data and data pipelines:
- The Pile: An 800GB Dataset of Diverse Text for Language Modeling
- CCNet: Extracting High Quality Monolingual Datasets from Web Crawl Data
- The RefinedWeb Dataset for Falcon LLM: Outperforming Curated Corpora with Web Data, and Web Data Only
- Dolma: an Open Corpus of Three Trillion Tokens for Language Model Pretraining Research
- RedPajama: an Open Dataset for Training Large Language Models
-
End-to-end Data preprocessing libraries:
-
Classic NLP data preprocessing libararies:
Post-training 🎯¶
Goal: Align base LLMs to new tasks or improve its existing abilities in chat-based dialogs, structured tasks or domain-specific data.
This aligning is done via supervised fine-tuning and preference optimization (reward modelling or RLHF/RLAIF).
-
Choosing Base model and deciding training regimes (5)
-
Collection and cleaning (1)
-
Curation (2)
-
Transformation (Check Exercise) (3)
-
Validation (4)
- Instruction–response pairs, multi-turn formatting style and tool-use coverage. Synthetic data generation.
- PII filtering and annotation by humans or AI. Ranking and scoring by humans or smaller models for Reward modelling. Deduplication.
- Tokenization, formatting into chat templates, sharding and packing for effecient GPU training.
Tokenizers: HF Fast-tokenizer.
Finetuning libraries like TRL internally handle most of this. - Schema validation (for example with Pydantic), quality checks, simple benchmarks, and basic stats.
- Decide model size, architecutre, post-training track record in SFT/PO/RL.
Dataset file formats¶
Some commonly used include:
-
JSON/JSONL (.jsonl, also .jsonl.gz or .zst)
- Use when datasets are small/medium and you want quick edits and reviews. Good for chat-style SFT and preference pairs.
- Pros: easy to read, easy to diff, streams line-by-line.
- Cons: bigger files, slower random access, no built-in schema.
-
Apache Arrow (.arrow)
- Use for fast local reads and training-ready batches. Works well with Hugging Face Datasets.
- Pros: column-based, memory-mapped, typed; very fast.
- Cons: less common for general analytics than Parquet.
-
Parquet (.parquet)
- Use for larger local datasets and preprocessing before training.
- Pros: column-based and compressed; efficient scans; easy to split into parts.
- Cons: writing can be heavier; very small rows need careful block sizing.
Tips on storage
- Prefer column-based + compressed shards (Parquet/Arrow) for scale; use JSONL for iteration and human review.
- Shard size: 50–500 MB per shard is a good starting point for multi-process training.
- Compress with zstd or gzip; keep a local manifest and checksums.
- Keep explicit schemas: for SFT {messages: [...], meta: {...}}; for preference data {prompt, chosen, rejected}.
On Alvis:
- Check your usage and quota using
C3SE_quotaand for Cephyr file usagewhere-are-my-files. - Prefer few large files over many small files. File-IO can be a limiting factor.
Recommended defaults:
- For local Hugging Face SFT/RLAIF: use Parquet or Arrow shards.
- Use JSONL for prototyping, manual review, and small experiments.
- Always record dataset version, schema, and shard manifest for reproducibility.
Exercise
- Create
~/portal/jupyterdir if you dont have already. - Copy
llm-workshop/containers/post_train/post_train_env.shto your~/portal/jupyter/ - Start a jupyter server with 1x A40 (or above) GPU with
post_train_envenvironment and working directory set to your personal project directory. - Run
data_pipelines.ipynbto prepare a dataset for Supervised Finetuning on openai's gsm8k math dataset.
Resources 📚
-
LLM papers on data and data pipelines:
- Training language models to follow instructions with human feedback
- Direct Preference Optimization: Your Language Model is Secretly a Reward Model
- Constitutional AI: Harmlessness from AI Feedback
- LIMA: Less Is More for Alignment
- Self-Instruct: Aligning Language Models with Self-Generated Instructions