Parallel computing with Python

Questions

  • What is parallel computing?

  • What are the different parallelization mechanisms for Python?

  • How to implement parallel algorithms in Python code?

  • How to deploy threads and workers at our HPC centers?

Objectives

  • Learn general concepts for parallel computing

  • Gain knowledge on the tools for parallel programming in different languages

  • Familiarize with the tools to monitor the usage of resources

Prerequisites

  • Demo

  • If not already done so:

$ ml GCCcore/11.2.0 Python/3.9.6

$ virtualenv --system-site-packages /proj/nobackup/<your-project-storage>/vpyenv-python-course

$ source /proj/nobackup/<your-project-storage>/vpyenv-python-course/bin/activate
  • For the numba example install the corresponding module:

$ pip install numba
  • For the mpi4py example add the following modules:

$ ml GCC/11.2.0 OpenMPI/4.1.1

$ pip install mpi4py
  • For the f2py example, this command should be available on the terminal when numpy is installed:

$ pip install numpy
  • For the Julia example we will need PyJulia:

$ ml Julia/1.9.3-linux-x86_64

$ pip install julia
  • Start Python on the command line and type:

>>> import julia
>>> julia.install()
  • Quit Python, you should be ready to go!

In Python there are different schemes that can be used to parallelize your code. We will only take a look at some of these schemes that illustrate the general concepts of parallel computing. The aim of this lecture is to learn how to run parallel codes in Python rather than learning to write those codes.

The workhorse for this section will be a 2D integration example:

\[\int^{\pi}_{0}\int^{\pi}_{0}\sin(x+y)dxdy = 0\]

One way to perform the integration is by creating a grid in the x and y directions. More specifically, one divides the integration range in both directions into n bins. A serial code (without optimization) can be seen in the following code block.

We can run this code on the terminal as follows (similarly at both HPC2N and UPPMAX):

Warning

Although this works on the terminal, having many users doing computations at the same time for this course, could create delays for other users

$ python integration2d_serial.py
Integral value is -7.117752e-17, Error is 7.117752e-17
Time spent: 20.39 sec

Because of that, we can use for short-time jobs the following command:

$ srun -A <your-projec-id> -n 1 -t 00:10:00 python integration2d_serial.py
Integral value is -7.117752e-17, Error is 7.117752e-17
Time spent: 20.39 sec

where srun has the flags that are used in a standard batch file.

Note that outputs can be different, when timing a code a more realistic approach would be to run it several times to get statistics.

One of the crucial steps upon parallelizing a code is identifying its bottlenecks. In the present case, we notice that the most expensive part in this code is the double for loop.

Serial optimizations

Just before we jump into a parallelization project, Python offers some options to make serial code faster. For instance, the Numba module can assist you to obtain a compiled-quality function with minimal efforts. This can be achieved with the njit() decorator:

The execution time is now:

$ python integration2d_serial_numba.py
Integral value is -7.117752e-17, Error is 7.117752e-17
Time spent: 1.90 sec

Another option for making serial codes faster, and specially in the case of arithmetic intensive codes, is to write the most expensive parts of them in a compiled language such as Fortran or C/C++. In the next paragraphs we will show you how Fortran code for the 2D integration case can be called in Python.

We start by writing the expensive part of our Python code in a Fortran function in a file called fortran_function.f90:

Then, we need to compile this code and generate the Python module (myfunction):

Warning

For UPPMAX you may have to change gcc version like:

$ ml gcc/10.3.0

Then continue…

$ f2py -c -m myfunction fortran_function.f90
running build
running config_cc
...

this will produce the Python/C API myfunction.cpython-39-x86_64-linux-gnu.so, which can be called in Python as a module:

The execution time is considerably reduced:

$ python call_fortran_code.py
Integral value is -7.117752e-17, Error is 7.117752e-17
Time spent: 1.30 sec

Compilation of code can be tedious specially if you are in a developing phase of your code. As an alternative to improve the performance of expensive parts of your code (without using a compiled language) you can write these parts in Julia (which doesn’t require compilation) and then calling Julia code in Python. For the workhorse integration case that we are using, the Julia code can look like this:

A caller script for Julia would be,

Timing in this case is similar to the Fortran serial case,

$ python call_julia_code.py
Integral value is -7.117752e-17, Error is 7.117752e-17
Time spent: 1.29 sec

If even with the previous (and possibly others from your own) serial optimizations your code doesn’t achieve the expected performance, you may start looking for some parallelization scheme. Here, we describe the most common schemes.

Threads

In a threaded parallelization scheme, the workers (threads) share a global memory address space. The threading module is built into Python so you don’t have to installed it. By using this module, one can create several threads to do some work in parallel (in principle). For jobs dealing with files I/O one can observe some speedup by using the threading module. However, for CPU intensive jobs one would see a decrease in performance w.r.t. the serial code. This is because Python uses the Global Interpreter Lock (GIL) which serializes the code when several threads are used.

In the following code we used the threading module to parallelize the 2D integration example. Threads are created with the construct threading.Thread(target=function, args=()), where target is the function that will be executed by each thread and args is a tuple containing the arguments of that function. Threads are started with the start() method and when they finish their job they are joined with the join() method,

Notice the output of running this code on the terminal:

$ python integration2d_threading.py
Integral value is 4.492851e-12, Error is 4.492851e-12
Time spent: 21.29 sec

Although we are distributing the work on 4 threads, the execution time is longer than in the serial code. This is due to the GIL mentioned above.

Implicit Threaded

Some libraries like OpenBLAS, LAPACK, and MKL provide an implicit threading mechanism. They are used, for instance, by numpy module for computing linear algebra operations. You can obtain information about the libraries that are available in numpy with numpy.show_config(). This can be useful at the moment of setting the number of threads as these libraries could use different mechanisms for it, for the following example we will use the OpenMP environment variables.

Consider the following code that computes the dot product of a matrix with itself:

the timing for running this code with 1 thread is:

$ export OMP_NUM_THREADS=1
$ python dot.py
Time spent: 1.14 sec

while running with 2 threads is:

$ export OMP_NUM_THREADS=2
$ python dot.py
Time spent: 0.60 sec

It is also possible to use efficient threads if you have blocks of code written in a compiled language. Here, we will see the case of the Fortran code written above where OpenMP threads are used. The parallelized code looks as follows:

The way to compile this code differs to the one we saw before, now we will need the flags for OpenMP:

$ f2py -c --f90flags='-fopenmp' -lgomp -m myfunction_openmp fortran_function_openmp.f90

the generated module can be then loaded,

the execution time by using 4 threads is:

$ export OMP_NUM_THREADS=4
$ python call_fortran_code_openmp.py
Integral value is 4.492945e-12, Error is 4.492945e-12
Time spent: 0.37 sec

More information about how OpenMP works can be found in the material of a previous OpenMP course offered by some of us.

Distributed

In the distributed parallelization scheme the workers (processes) can share some common memory but they can also exchange information by sending and receiving messages for instance.

In this case, the execution time is reduced:

$ python integration2d_multiprocessing.py
Integral value is 4.492851e-12, Error is 4.492851e-12
Time spent: 6.06 sec

MPI

More details for the MPI parallelization scheme in Python can be found in a previous MPI course offered by some of us.

Execution of this code gives the following output:

$ mpirun -np 4 python integration2d_mpi.py
Integral value is 4.492851e-12, Error is 4.492851e-12
Time spent: 5.76 sec

For long jobs, one will need to run in batch mode. Here is an example of a batch script for this MPI example,

#!/bin/bash
#SBATCH -A hpc2n20XX-XYZ
#SBATCH -t 00:05:00        # wall time
#SBATCH -n 4
#SBATCH -o output_%j.out   # output file
#SBATCH -e error_%j.err    # error messages

ml purge > /dev/null 2>&1
ml GCCcore/11.2.0 Python/3.9.6
ml GCC/11.2.0 OpenMPI/4.1.1
#ml Julia/1.7.1-linux-x86_64  # if Julia is needed

source /proj/nobackup/<your-project-storage>/vpyenv-python-course/bin/activate

mpirun -np 4 python integration2d_mpi.py

Monitoring resources’ usage

Monitoring the resources that a certain job uses is important specially when this job is expected to run on many CPUs and/or GPUs. It could happen, for instance, that an incorrect module is loaded or the command for running on many CPUs is not the proper one and our job runs in serial mode while we allocated possibly many CPUs/GPUs. For this reason, there are several tools available in our centers to monitor the performance of running jobs.

HPC2N

On a Kebnekaise terminal, you can type the command:

$ job-usage job_ID

where job_ID is the number obtained when you submit your job with the sbatch command. This will give you a URL that you can copy and then paste in your local browser. The results can be seen in a graphical manner a couple of minutes after the job starts running, here there is one example of how this looks like:

../_images/monitoring-jobs.png

The resources used by a job can be monitored in your local browser. For this job, we can notice that 100% of the requested CPU and 60% of the GPU resources are being used.

Dask

Dask is a array model extension and task scheduler. By using the new array classes, you can automatically distribute operations across multiple CPUs. Dask is a library in Python for flexible parallel computing. Among the features are the ability to deal with arrays and data frames, and the possibility of performing asynchronous computations, where first a computation graph is generated and the actual computations are activated later on demand.

Dask is very popular for data analysis and is used by a number of high-level Python libraries:

  • Dask arrays scale NumPy (see also xarray)

  • Dask dataframes scale Pandas workflows

  • Dask-ML scales Scikit-Learn

  • Dask divides arrays into many small pieces (chunks), as small as necessary to fit it into memory.

  • Operations are delayed (lazy computing) e.g. tasks are queue and no computation is performed until you actually ask values to be computed (for instance print mean values).

  • Then data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

Jupyter notebooks for other purposes than just reading it, must be run in batch mode. First, create a batch script using the following one as a template:

#!/bin/bash
#SBATCH -A hpc2n20XX-XYZ
#SBATCH -t 00:05:00
#SBATCH -n 4
#SBATCH -o output_%j.out   # output file
#SBATCH -e error_%j.err    # error messages

ml purge > /dev/null 2>&1
ml GCC/12.3.0 OpenMPI/4.1.5 JupyterLab/4.0.5 dask/2023.9.2

# Start JupyterLab
jupyter lab --no-browser --ip $(hostname)

Then, copy and paste the notebook located here Exercises/examples/Dask-Ini.ipynb to your current folder. Send the job to the queue (sbatch job.sh) and once the job starts copy the line containing the string http://b-cnyyyy.hpc2n.umu.se:8888/lab?token= and paste it in a local browser on Kebnekaise. Now you can select the notebook.

Dask is very popular for data analysis and is used by a number of high-level Python libraries:

  • Dask arrays scale NumPy (see also xarray)

  • Dask dataframes scale Pandas workflows

  • Dask-ML scales Scikit-Learn

  • Dask divides arrays into many small pieces (chunks), as small as necessary to fit it into memory.

  • Operations are delayed (lazy computing) e.g. tasks are queued and no computations is performed until you actually ask values to be computed (for instance print mean values).

  • Then data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

Exercises

Keypoints

  • You deploy cores and nodes via SLURM, either in interactive mode or batch

  • In Python, threads, distributed and MPI parallelization can be used.