Big data with Python

“Learning outcomes”

Learners

  • can allocate resources sufficient to data size

  • can decide on useful file formats

  • can use data-chunking as technique

  • know where to learn more

High-Performance Data Analytics (HPDA)

Discussion

Do you already work with large data sets?

Why we need to take special actions

Remember this one?

../_images/when-to-use-pandas.png

Discussion

  • What can limit us?

What the constraints are

  • storage

  • memory

Solutions and tools

  • Allocate enough RAM
    • If you are running ready tools

    • or cannot update code or use other packages

  • Choose file format for reading and writing

  • Choose the right Python package

  • Is chunking suitable?

Allocating RAM

  • How much is actually loaded into the working memory (RAM)

  • Is more data in variables created during the run or work?

Discussion

Have you seen the Out-of-memory (OOM) error?

What to do

  • By allocating many cores on a node will give you more available memory

  • If the order 128 GB is not enough there are so-called fat nodes with at least 512 GB and up to 3 TB.

  • On some clusters you do not have to request additional CPUs to get additional memory. Use the slurm options
    • --mem or

    • --mem-per-cpu

Important

  • You do not have to explicitely run threads or other parallelism.

  • Allocating several nodes for one one big problem is not useful.
    • Note that shared memory among the cores works within node only.

Principles

Use the Slurm options for either “BATCH”, “INTERACTIVE” from command line or from OnDemand GUIs.

  • Allocate RAM using the full node RAM divided by number of course principle
    • Ex: 128 GB with 20 cores –> 6.4 GB per core

    • Allocate number of cores to cover your needs.

    • -n <number>

  • Request the memory needed and choose number of cores
    • --mem=<size>[K|M|G|T]

    • Example: --mem=150G

  • Request the memory-per-core needed and choose number of cores
    • --mem-core=<size>[K|M|G]

    • Example: --mem-per-cpu=16G

  • Request a “FAT” node.
    • Typically you can only allocate a full node here, no core parts.

    • You ask here for a non-default partition.

    • How to do this, search your cluster documentation, see exercise below.

Note

  • “core-hours” drawn from your project may be set to the maximum of “number of cores” and “memory part of node” requested.

  • So there is no win to ask for one core but much memory!

Exercise: Memory allocation (10 min)

  1. Log in to a Desktop (ThinLinc or OnDemand) (see Log in and other preparations)

Discussion

  • Take some time to find out the answers for your specific cluster for the questions below, using the table of hardware below.

Actually start an interactive sesion with 4 cores for 3 hours.

  • We will use it for the exercises later.

  • Since it may take some time to get the allocation we do it now already!

  • Follow the best procedure for your cluster, e.g. from command-line or OnDemand.

How to get a node with more RAM?

  • See local HPC center documentation in how to do so!

  • Try first to search or navigate the pages

Note

  • We recommend a desktop environment for speed of the graphics.

  • connecting from local terminal with “ssh -X” (X11 forwarding) can be be used but is slower.

File formats

Data and storage format

In real scientific applications, data is complex and structured and usually contains both numerical and text data. Here we list a few of the data and file storage formats commonly used.

An overview of common data formats

Name:
Human
readable:
Space
efficiency:
Arbitrary
data:
Tidy
data:
Array
data:
Long term
storage/sharing:

Pickle

🟨

🟨

🟨

CSV

🟨

Feather

Parquet

🟨

🟨

npy

🟨

HDF5

NetCDF4

JSON

🟨

Excel

🟨

🟨

Graph formats

🟨

🟨

Important

Legend

  • ✅ : Good

  • 🟨 : Ok / depends on a case

  • ❌ : Bad

Adapted from Aalto university’s Python for scientific computing

See also

  • ENCCS course “HPDA-Python”: Scientific data

  • Aalto Scientific Computing course “Python for Scientific Computing”: Xarray

Exercise file formats (10 minutes)

Reading NetCDF files

View file formats

  • Go over file formats and see if some are more relevant for your work.

  • Would you look at other file formats and why?

(optional)

Computing efficiency with Python

Python is an interpreted language, and many features that make development rapid with Python are a result of that, with the price of reduced performance in many cases.

  • Dynamic typing

  • Flexible data structures

  • There are some packages that are more efficient than Numpy and Pandas.

    • SciPy is a library that builds on top of NumPy.

      • It contains a lot of interfaces to battle-tested numerical routines written in Fortran or C, as well as Python implementations of many common algorithms.

      • Reads NETCDF!

    • ENCCS course material

Xarray package

  • xarray is a Python package that builds on NumPy but adds labels to multi-dimensional arrays.

    • introduces labels in the form of dimensions, coordinates and attributes on top of raw NumPy-like multidimensional arrays, which allows for a more intuitive, more concise, and less error-prone developer experience.

    • It also borrows heavily from the Pandas package for labelled tabular data and integrates tightly with dask for parallel computing.

  • Xarray is particularly tailored to working with NetCDF files.

  • But work for aother files as well

  • Explore it a bit in the (optional) exercise below!

Dask

How to use more resources than available?

../_images/when-to-use-pandas.png

Dask is very popular for data analysis and is used by a number of high-level Python libraries:

  • Dask is composed of two parts:

    • Dask Clusters
      • Dynamic task scheduling optimized for computation. Similar to other workflow management systems, but optimized for interactive computational workloads.

      • ENCCS course

    • “Big Data” Collections
      • Like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.

      • ENCCS course

  • Dask provides dynamic parallel task scheduling and three main high-level collections:

See also

  • dask_ml package: Dask-ML provides scalable machine learning in Python using Dask alongside popular machine learning libraries like Scikit-Learn, XGBoost, and others.

  • Dask.distributed: Dask.distributed is a lightweight library for distributed computing in Python. It extends both the concurrent.futures and dask APIs to moderate sized clusters.

  • A Dask array looks and feels a lot like a NumPy array.

  • However, a Dask array uses the so-called “lazy” execution mode, which allows one to
    • build up complex, large calculations symbolically

    • before turning them over the scheduler for execution.

Chunks

  • Dask divides arrays into many small pieces (chunks), as small as necessary to fit it into memory.

  • Operations are delayed (lazy computing) e.g.

    • tasks are queued and no computation is performed until you actually ask values to be computed (for instance print mean values).

  • Then data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

  • And data is gathered in the end.

  • Tools like Dask and xarray handle “chunking” automatically.

  • Note that number of chunks does not need to be equal to number of cores.

Big file → split into chunks → parallel workers → results combined.

To think of

  • Chunk size and number of them affect the performance due to overhad/administration of the chunking and combination.

  • Briefly explain what happens when a Dask job runs on multiple cores.

Example from dask.org

# Arrays implement the Numpy API
import dask.array as da
x = da.random.random(size=(10000, 10000),
                     chunks=(1000, 1000))
x + x.T - x.mean(axis=0)
# It runs using multiple threads on one node.
# It could also be distributed to multiple nodes

Polars package

  • polars is a Python package that presnts itself as Blazingly Fast DataFrame Library
    • Utilizes all available cores on your machine.

    • Optimizes queries to reduce unneeded work/memory allocations.

    • Handles datasets much larger than your available RAM.

    • A consistent and predictable API.

    • Adheres to a strict schema (data-types should be known before running the query).

Exercises: Packages

Set up the environment

Important

Interactive use (Recommended)

Follow the instructions here: https://docs.hpc2n.umu.se/tutorials/connections/#example__-__jupyter__with__extra__modules.

  • Add these lines in the batch script

module load SciPy-bundle/2023.07 matplotlib/3.7.2 Tkinter/3.11.3
  • Continue and start Jupyter

  • And install polars, dask & xarray to ~/.local/ if you don’t already have it

! pip install --user xarray
! pip install --user dask
! pip install --user polars
  • You may have to restart the Jupyter kernel (or even Jupyter session) to be able to be able to load the just instaleld package(s).

Chunk sizes in Dask

  • The following example calculate the mean value of a random generated array.

  • Run the 2 examples and see the performance improvement by using dask.

import numpy as np
%%time
x = np.random.random((20000, 20000))
y = x.mean(axis=0)

But what happens if we use different chunk sizes? Try out with different chunk sizes:

  • What happens if the dask chunks=(20000,20000)

  • What happens if the dask chunks=(250,250)

(Optional) Polars

  • Browse: https://docs.pola.rs/.
    • find something interesting for you! Test some lines if you want to!

    • tips:

  • Check if your cluster has Polars!

Summary

Follow-up discussion

  • New learnings?

  • Useful file formats

  • Resources sufficient to data size

  • Data-chunking as technique if not enough RAM

  • Is Xarray/Polars/Dask useful for you?

Keypoints

  • Allocate more RAM by asking for
    • Several cores

    • Nodes will more RAM

    • Check job memory usage with sacct or sstat. Check you documentation!

  • File formats
    • No format fits all requirements

    • HDF5 and NetCDF good for Big data since it allows loading parts of the file into memory

  • Store temporary data in local scratch ($SNIC_TMP).

  • Packages
    • xarray
      • can deal with 3D-data and higher dimensions

    • Dask
      • uses lazy execution

      • Only use for processing very large amount of data

    • Chunking: Data source → Format choice → Load/Chunk → Process → Write