Skip to content

Data Pipelines

Learning outcomes
  • Understand the key components of data pipelines for LLMs
  • Run a part of post-training data preparation pipeline
Quiz yourself!
Answer key

1:B, 2:B, 3:B

  1. What is the primary goal of pre-training?

    A. Train small task-specific classifiers
    B. Assemble large, diverse, governed corpora and feed tokens efficiently so the model learns general-purpose representations
    C. Compress datasets for long-term archival
    D. Only fine-tune models on downstream tasks

  2. Which file format is recommended for fast local reads and training-ready batches with Hugging Face Datasets?

    A. JSON/JSONL
    B. Apache Arrow
    C. CSV
    D. XML

  3. According to the post-training section, aligning base LLMs to tasks is primarily done via:

    A. Data sharding and compression
    B. Supervised fine-tuning and preference optimization (reward modelling or RLHF/RLAIF)
    C. Only unsupervised pre-training
    D. Manual rule-based post-processing

Data meme

  • Well-designed data pipelines determine LLM quality, safety, and training throughput.

  • Success of all frontier models like GPT-5 relies heavily on quality data and constructing effecient pipelines around it that reduces training compute and improves the capabilities of the model that we desire it to have.

SmolLM3 training team

From our experience, and though it might disappoint architecture enthusiasts, the biggest performance gains usually come from data curation.

Pre-training 📊

Goal: Assemble large, diverse, governed corpora and feed tokens efficiently to the model to learn general-purpose representations. LLM learns in self-supervised fashion.

Raw data, is often messy and unsuitable for learning linguistic semantics. It typically exists in diverse formats like HTML, PDFs, spreadsheets etc, requiring extensive preprocessing to make it usable for training. Challenge lies in preserving the content and structure during this lossy process of data cleaning.

  • Data acquisition and licensing (1)

  • Content extraction, normalization and detection (2)

  • Deduplication (3)

  • Quality filtering, decontamination (4)

  • Tokenization and training (5)

  • Mixture building and sampling (6)

  • Storage and sharding (7)

  • Training and observability (8)

  • Continuous Evaluation (9)

  • Data governance and ethics (10)

  1. web crawl, books, Github code, academic papers. PII and copyright governance.
    Web crawl : CC
  2. language ID, Unicode cleanup, boilerplate removal, doc boundaries.
    Language classifiers: GlotLID, Fasttext
  3. exact and near-dup (MinHash/SimHash); mitigate contamination and overfitting.
  4. heuristic/classifier filters (toxicity, spam), quality scoring, temperature sampling.
  5. train vocab (BPE/Unigram), pre-tokenize, pack sequences respecting EOD. Tokenization: HF Tokenizers,
  6. domain/language balance, curriculum, up/down-sampling.
  7. Arrow/Parquet/WebDataset, deterministic sharding, resume-safe streaming. Arrow: HF Datasets
  8. Remove overlaps with evals; versioning, lineage, dashboards. Checkpoint/logging: HF Trainer
  9. benchmark language understanding, reasoning, QA etc. Bias, stereotype, toxicity and answer safety checks.
  10. Ethical charter, inspection tools for data composition, licensing, artifact release for reproducibility and further research.
Full reproduction of the FineWeb dataset
fineweb.py
"""
This file contains the code used to process and create the
FineWeb dataset (https://huggingface.co/datasets/HuggingFaceFW/fineweb)
"""

from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashDedupFilter, MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import MinhashConfig, MinhashDedupBuckets
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
    C4QualityFilter,
    FineWebQualityFilter,
    GopherQualityFilter,
    GopherRepetitionFilter,
    LanguageFilter,
    URLFilter,
)
from datatrove.pipeline.formatters import PIIFormatter
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.tokens import TokensCounter
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.hashing import HashConfig


"""
    we first ran the following pipeline for each dump
"""
DUMP_TO_PROCESS = "CC-MAIN-2023-50"  # example

MAIN_OUTPUT_PATH = "s3://some_s3_bucket"
FILTERING_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/base_processing"

main_processing_executor = SlurmPipelineExecutor(
    job_name=f"cc_{DUMP_TO_PROCESS}",
    pipeline=[
        WarcReader(
            f"s3://commoncrawl/crawl-data/{DUMP_TO_PROCESS}/segments/",
            glob_pattern="*/warc/*",  # we want the warc files
            default_metadata={"dump": DUMP_TO_PROCESS},
        ),
        URLFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/1_url/{DUMP_TO_PROCESS}")),
        Trafilatura(favour_precision=True),
        LanguageFilter(
            exclusion_writer=JsonlWriter(
                f"{FILTERING_OUTPUT_PATH}/2_non_english/",
                output_filename="${language}/" + DUMP_TO_PROCESS + "/${rank}.jsonl.gz",
                # folder structure: language/dump/file
            )
        ),
        GopherRepetitionFilter(
            exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/3_gopher_rep/{DUMP_TO_PROCESS}")
        ),
        GopherQualityFilter(
            exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/4_gopher_qual/{DUMP_TO_PROCESS}")
        ),
        C4QualityFilter(
            filter_no_terminal_punct=False,
            exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/5_c4/{DUMP_TO_PROCESS}"),
        ),
        FineWebQualityFilter(
            exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/6_fineweb_qual/{DUMP_TO_PROCESS}")
        ),
        JsonlWriter(f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"),
    ],
    tasks=8000,
    time="10:00:00",
    logging_dir=f"{MAIN_OUTPUT_PATH}/logs/base_processing/{DUMP_TO_PROCESS}",
    slurm_logs_folder=f"logs/base_processing/{DUMP_TO_PROCESS}/slurm_logs",  # must be local
    randomize_start_duration=180,  # don't hit the bucket all at once with the list requests
    mem_per_cpu_gb=2,
    partition="hopper-cpu",
)
main_processing_executor.run()

"""
    we then applied minhash deduplication to each individual dump,
"""

# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(
    hash_config=HashConfig(
        hash_fc="sha1",  # better precision -> fewer false positives (collisions)
        precision=64,
    ),
    num_buckets=14,
    hashes_per_bucket=8,
    n_grams=5,
)

S3_MINHASH_BASE_PATH = f"{MAIN_OUTPUT_PATH}/minhash"

S3_LOGS_FOLDER = f"{MAIN_OUTPUT_PATH}/logs/minhash"
LOCAL_LOGS_FOLDER = "logs/minhash"

TOTAL_TASKS = 1000

# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(
    f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"
)  # this is the output from the first part

# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = SlurmPipelineExecutor(
    job_name=f"mh1_{DUMP_TO_PROCESS}",
    pipeline=[
        INPUT_READER,
        MinhashDedupSignature(
            output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures", config=minhash_config
        ),
    ],
    tasks=TOTAL_TASKS,
    time="5:00:00",
    partition="hopper-cpu",
    logging_dir=f"{S3_LOGS_FOLDER}/signatures",
    slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/signatures/slurm_logs",
    randomize_start_duration=180,
    depends=main_processing_executor,  # only start after the first one completes
)

stage2 = SlurmPipelineExecutor(
    job_name=f"mh2_{DUMP_TO_PROCESS}",
    pipeline=[
        MinhashDedupBuckets(
            input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures",
            output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets",
            config=MinhashConfig(hash_config=minhash_config.hash_config),
        ),
    ],
    tasks=minhash_config.num_buckets * 50,  # the code supports parallelizing each bucket. here we run 50
    # workers per bucket
    randomize_start_duration=180,
    logging_dir=f"{S3_LOGS_FOLDER}/buckets",
    partition="hopper-cpu",
    time="02:00:00",
    mem_per_cpu_gb=4,
    cpus_per_task=3,  # you can add run more (smaller) tasks if you do not have a lot of memory
    depends=stage1,
)


stage3 = SlurmPipelineExecutor(
    job_name=f"mh3_{DUMP_TO_PROCESS}",
    pipeline=[
        MinhashDedupCluster(
            input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets",
            output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids",
            config=minhash_config,
        ),
    ],
    tasks=1,  # this step runs on a single task
    logging_dir=f"{S3_LOGS_FOLDER}/clustering",
    partition="hopper-cpu",
    time="30:00:00",  # and can also be quite slow. Usually not this slow though
    mem_per_cpu_gb=25,
    cpus_per_task=8,  # if you dedup a full dump, you do need a lot of memory for this one
    depends=stage2,
)


stage4 = SlurmPipelineExecutor(
    job_name=f"mh4_{DUMP_TO_PROCESS}",
    pipeline=[
        INPUT_READER,
        TokensCounter(),  # you can remove this one, it's just a nice way to know how many tokens we have
        # before and after dedup
        MinhashDedupFilter(input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids"),
        # run the PII removal
        PIIFormatter(),
        JsonlWriter(f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/deduped_output"),
    ],
    tasks=TOTAL_TASKS,
    logging_dir=f"{S3_LOGS_FOLDER}/filtering",
    partition="hopper-cpu",
    time="5:00:00",
    mem_per_cpu_gb=4,
    depends=stage3,
)

# launch dedup pipelines
stage4.run()
Resources 📚

Post-training 🎯

Goal: Align base LLMs to new tasks or improve its existing abilities in chat-based dialogs, structured tasks or domain-specific data.

This aligning is done via supervised fine-tuning and preference optimization (reward modelling or RLHF/RLAIF).

  • Choosing Base model and deciding training regimes (5)

  • Collection and cleaning (1)

  • Curation (2)

  • Transformation (Check Exercise) (3)

  • Validation (4)

  1. Instruction–response pairs, multi-turn formatting style and tool-use coverage. Synthetic data generation.
  2. PII filtering and annotation by humans or AI. Ranking and scoring by humans or smaller models for Reward modelling. Deduplication.
  3. Tokenization, formatting into chat templates, sharding and packing for effecient GPU training.
    Tokenizers: HF Fast-tokenizer.
    Finetuning libraries like TRL internally handle most of this.
  4. Schema validation (for example with Pydantic), quality checks, simple benchmarks, and basic stats.
  5. Decide model size, architecutre, post-training track record in SFT/PO/RL.

Dataset file formats

Some commonly used include:

  • JSON/JSONL (.jsonl, also .jsonl.gz or .zst)

    • Use when datasets are small/medium and you want quick edits and reviews. Good for chat-style SFT and preference pairs.
    • Pros: easy to read, easy to diff, streams line-by-line.
    • Cons: bigger files, slower random access, no built-in schema.
  • Apache Arrow (.arrow)

    • Use for fast local reads and training-ready batches. Works well with Hugging Face Datasets.
    • Pros: column-based, memory-mapped, typed; very fast.
    • Cons: less common for general analytics than Parquet.
  • Parquet (.parquet)

    • Use for larger local datasets and preprocessing before training.
    • Pros: column-based and compressed; efficient scans; easy to split into parts.
    • Cons: writing can be heavier; very small rows need careful block sizing.

Tips on storage

  • Prefer column-based + compressed shards (Parquet/Arrow) for scale; use JSONL for iteration and human review.
  • Shard size: 50–500 MB per shard is a good starting point for multi-process training.
  • Compress with zstd or gzip; keep a local manifest and checksums.
  • Keep explicit schemas: for SFT {messages: [...], meta: {...}}; for preference data {prompt, chosen, rejected}.

On Alvis:

  • Check your usage and quota using C3SE_quota and for Cephyr file usage where-are-my-files.
  • Prefer few large files over many small files. File-IO can be a limiting factor.

Recommended defaults:

  • For local Hugging Face SFT/RLAIF: use Parquet or Arrow shards.
  • Use JSONL for prototyping, manual review, and small experiments.
  • Always record dataset version, schema, and shard manifest for reproducibility.

Exercise

  • Create ~/portal/jupyter dir if you dont have already.
  • Copy llm-workshop/containers/post_train/post_train_env.sh to your ~/portal/jupyter/
  • Start a jupyter server with 1x A40 (or above) GPU with post_train_env environment and working directory set to your personal project directory.
  • Run data_pipelines.ipynb to prepare a dataset for Supervised Finetuning on openai's gsm8k math dataset.
Resources 📚