This lesson is being piloted (Beta version)

Dask for parallel data operations

Overview

Teaching: 50 min
Exercises: 25 min
Questions
  • How can I parallelise common library operations across multiple cores and nodes?

Objectives

While the strategies we’ve discussed so far can improve the performance of your software on a single node, there will always be cases where you require more resources than a single node can provide. For example, a single iteration of your algorithm may take longer than the wall-time limits of the machine, or your algorithm may be sequential, so that you can’t split it and run each step separately. You may also find that for very large problems, the memory capacity of the node becomes a limiting factor; in this case, using more than one node is not only necessary for to achieve a speedup, but to even run the problem at all.

Once we expand our programs beyond running on a single node, then we have more concerns to consider. The main one of these is communication between the nodes: each node needs to be aware of what it needs to do, and to be kept updated on any information from the other nodes necessary to keep doing this. A confounding factor is that communication between nodes is in most cases significantly slower than communicating between processes or threads running on the same node, so many algorithms will pay a significant performance penalty for running across multiple nodes.

Two commonly-encountered models of parallelism are:

Introducing Dask

Dask is a library that takes functionality from a number of popular libraries used for scientific computing in Python, including Numpy, Pandas, and scikit-learn, and extends them to run in parallel across a variety of different parallelisation setups. This includes multiple threads or multiple processes on the same node, as well as using multiple nodes.

For today, we’re going to jump straight to the most advanced case and look at how we can use it to run across multiple nodes on an HPC cluster.

While multi-node support is built in to Dask, we will use the dask-mpi package to help Dask interact with Slurm to create the right number of processes. Since we need to install extra packages, we first need to load the environment that we created earlier today.

$ module load anaconda/2021.05
$ source activate scw_test

Now, since we are going to use the MPI libraries that are installed on Sunbird, we can’t use Conda to install dask-mpi, since this will instead use Anaconda’s own version of MPI, which doesn’t work properly. Instead, we need to load the MPI libraries, and then install dask-mpi with pip.

$ module load compiler/intel/2019/5 mpi/intel/2019/5
$ pip install mpi4py dask-mpi dask-ml

The Message Passing Interface

While there isn’t time to talk in detail about MPI (the Message Passing Interface) today, it’s worth talking a little about how it works so that we can better understand how Dask interacts with it.

With MPI, multiple copies of the same program (ranks in the MPI jargon) are run, and these communicate with each other to share work. These are usually run with a utility called mpirun (or sometimes mpiexec), which starts many copies of a given program on the specified nodes.

On a cluster, mpirun is integrated with the scheduler (e.g. Slurm), so knows which nodes to place processes on automatically. Many clusters provide their own wrappers for mpirun to make this integration work better; in Slurm, this utility is srun. (You may remember we used srun with GNU Parallel to place processes on appropriate nodes; this was a special case where we only wanted a single process at a time.)

More than one process can run per node—sometimes it makes sense to run as many processes as there are CPU cores, since then you don’t also need to think about other sorts of parallelism. Programs that are aware of MPI will then talk to the MPI library (which mpirun makes available to them) to get enough information about the number and placement of processes to be able to do their work.

While MPI by default uses a collectivist approach to its communications, Dask’s model is a co-ordinator–worker one. To translate between these, dask-mpi finds the number of ranks from MPI, and does the following with them:

It then ignores MPI completely, using Dask’s built-in communications methods to communicate between the processes instead. MPI is only used to create the right number of processes, and communicate that number to Dask.

If we are programming in parallel effectively, most of our computation time will be spent having the workers do the work. If each piece of work is quite expensive, then this may mean that both the co-ordinator and the serial part of the work sit idling. Since in general we want each process to use a full node, then these processes can end up wasting two full nodes of computational resource that could have been getting us our results faster. Anticipating this, we can start two more processes than we have nodes, so that the workers can do some work on the same nodes as the co-ordinator and serial process sit.

So, to get Dask working with MPI under Slurm, we need two things:

Let’s look at these in turn. Firstly, the job script to submit this workload to the cluster looks like this:

#!/bin/bash --login
###
# Output file location
#SBATCH --output dasktest.out.%J
# Dask outputs a lot of debug information to stderr
# so let's direct that to a separate file
#SBATCH --error=dasktest.err.%J
# For now, let's use two nodes, with one 
#SBATCH --nodes=2
#SBATCH --ntasks=2
# We want to use the full node
#SBATCH --cpus-per-task=40
#SBATCH --exclusive
# Time limit for this job
#SBATCH --time 00:10:00
# specify our current project
# change this for your own work
#SBATCH --account=scw1389
# specify the reservation we have for the training workshop
# remove this for your own work
# replace XX with the code provided by your instructor
#SBATCH --reservation=scw1389_XX
###

# Get MPI and Anaconda ready to use
module load anaconda/2021.05 compiler/intel/2019/5 mpi/intel/2019/5
source activate scw_test

# Get Slurm to run the Python program that uses Dask
srun --overcommit \
     --distribution=cyclic \
     --nodes=${SLURM_NNODES} \
     --ntasks=$[SLURM_NTASKS+2] \
     --cpus-per-task=${SLURM_CPUS_PER_TASK} \
     python dasktest.py

While the #SBATCH directives are explained inline, in the call to srun, the meaning of the flags is:

Next, the program itself. This is a Dask version of a Scikit-learn example making use of the GridSearchCV function.

from os import environ
from datetime import datetime

from dask_mpi import initialize
from distributed import Client

from sklearn import datasets
from sklearn.model_selection import train_test_split

# Get the Dask version of GridSearchCV
from dask_ml.model_selection import GridSearchCV

from sklearn.metrics import classification_report
from sklearn.svm import SVC


def run_test(client):
    print(__doc__)

    # Loading the Digits dataset
    digits = datasets.load_digits()

    # To apply an classifier on this data, we need to flatten the image, to
    # turn the data in a (samples, feature) matrix:
    n_samples = len(digits.images)
    X = digits.images.reshape((n_samples, -1))
    y = digits.target

    # Split the dataset in two equal parts
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.5, random_state=0)

    # Set the parameters by cross-validation
    tuned_parameters = [{'kernel': ['rbf'], 'gamma': [1e-3, 1e-4],
                         'C': [1, 10, 100, 1000]},
                        {'kernel': ['linear'], 'C': [1, 10, 100, 1000]}]

    scores = ['precision', 'recall']

    for score in scores:
        print("# Tuning hyper-parameters for %s" % score)
        print()

        # scheduler=client makes sure that Dask uses the correct communications
        clf = GridSearchCV(
            SVC(), tuned_parameters, scoring='%s_macro' % score,
            scheduler=client
        )
        clf.fit(X_train, y_train)

        print("Best parameters set found on development set:")
        print()
        print(clf.best_params_)
        print()
        print("Grid scores on development set:")
        print()
        means = clf.cv_results_['mean_test_score']
        stds = clf.cv_results_['std_test_score']
        for mean, std, params in zip(means, stds, clf.cv_results_['params']):
            print("%0.3f (+/-%0.03f) for %r"
                  % (mean, std * 2, params))
        print()

        print("Detailed classification report:")
        print()
        print("The model is trained on the full development set.")
        print("The scores are computed on the full evaluation set.")
        print()
        y_true, y_pred = y_test, clf.predict(X_test)
        print(classification_report(y_true, y_pred))
        print()

    # Note the problem is too easy: the hyperparameter plateau is too flat and
    # the output model is the same for precision and recall with ties in
    # quality.


def main():
    # Work out from the environment how many threads to allocate
    num_threads = int(environ.get(
        'SLURM_CPUS_PER_TASK',
        environ.get('OMP_NUM_THREADS', 1)
    ))

    # Create the Dask workers
    initialize(interface='ib0', nthreads=num_threads)

    # Create the Dask object that will manage the communications
    client = Client()

    start = datetime.now()
    run_test(client=client)
    end = datetime.now()

    print(f"Time taken: {end - start}")


if __name__ == '__main__':
    main()

Parallel map with Dask

Like Pathos, Dask also gives a way of performing independent parallel operations across all parallel threads. For example, consider a “sieve of Eratosthenes” for finding prime numbers.

from time import sleep
from random import random
from datetime import datetime

def slow_is_prime(num):
    sleep(random())
    if num == 1:
        return False
    for test_factor in range(2, num // 2):
        if num % test_factor == 0:
            return False

    return True


def main():
    from argparse import ArgumentParser

    parser = ArgumentParser()
    parser.add_argument('min_num', type=int)
    parser.add_argument('max_num', type=int)
    args = parser.parse_args()

    start_time = datetime.now()
    num_primes = sum(map(slow_is_prime,
                         range(args.min_num, args.max_num + 1)))
    end_time = datetime.now()

    print(f'{num_primes} primes between {args.min_num} and {args.max_num} '
          f'[{end_time - start_time}]')


if __name__ == '__main__':
    main()

Since a parallel map needs enough work for each iteration to make the parallelisation worthwhile, let’s look at some relatively large numbers.

$ srun --partition=development --account=scw1000 --nodes=1 --ntasks=1 python primes_serial.py 100000000 100001000
54 primes between 100000000 and 100001000 [0:10:25.173913]

We can make this parallel across a single node using multiple workers, or we can go across multiple nodes using multiple threads per node.

Firstly, using only a single node, the code is relatively unchanged:

from time import sleep
from random import random
from datetime import datetime

from dask.distributed import Client

def slow_is_prime(num):
    sleep(random())
    if num == 1:
        return False
    for test_factor in range(2, num // 2):
        if num % test_factor == 0:
            return False

    return True


def main():
    from argparse import ArgumentParser

    parser = ArgumentParser()
    parser.add_argument('min_num', type=int)
    parser.add_argument('max_num', type=int)
    args = parser.parse_args()

    client = Client(threads_per_worker=1, n_workers=40)

    start_time = datetime.now()
    num_primes = sum(
        client.gather(
            client.map(slow_is_prime,
                       range(args.min_num, args.max_num + 1))
        )
    )
    end_time = datetime.now()

    print(f'{num_primes} primes between {args.min_num} and {args.max_num} '
          f'[{end_time - start_time}]')


if __name__ == '__main__':
    main()

The main change here is to create the Client object, and use that to perform a map followed by a gather. This is different to how map usually works (both in plain Python and in Pathos). In this case, rather than returning an iterator over the results of performing the computation, Client.map() returns a list of “futures”. You can think of a future as being like a dry-cleaning ticket: the dry cleaner (Client) has agreed to clean the clothing (process a particular piece of data), but it’s not ready yet—it could in a queue, be in the process of being washed (processed), or already be washed (processed) and ready for collection. You could sit and wait for it to complete, or you could take the ticket (future) and do some other useful work while the processing happens. Once you’ve run out of things to do, you can go back with the ticket (future) and collect the result if it’s ready, or wait until it is otherwise. This is what the call to Client.gather() does—it lets Dask know that we would like the results of the list of computations that we previously got futures for. In principle, if there were some post-processing that we wanted to do on each result, we could get a subset of them at a time and process those while others completed, but in this case we only want to perform a summation, so we do the computational equivalent of handing over a stack of dry-cleaning and then sitting next to the reception desk until it is completed.

$ srun --partition=development --account=scw1000 --nodes=1 --ntasks=40
--exclusive python primes_1node.py 100000000 100001000
54 primes between 100000000 and 100001000 [0:00:19.193485]

This gives us a 32x improvement in speed on 40 CPU cores—this is around 83% efficient. What about running across multiple nodes? This requires a few more changes to the code:

from os import environ
from time import sleep
from random import random
from datetime import datetime

from dask_mpi import initialize
from distributed import Client


def slow_is_prime(num):
    sleep(random())
    if num == 1:
        return False
    for test_factor in range(2, num // 2):
        if num % test_factor == 0:
            return False

    return True


def main():
    from argparse import ArgumentParser

    parser = ArgumentParser()
    parser.add_argument('min_num', type=int)
    parser.add_argument('max_num', type=int)
    args = parser.parse_args()

    num_threads = int(environ.get(
        'SLURM_CPUS_PER_TASK',
        environ.get('OMP_NUM_THREADS', 1)
    ))
    initialize(interface='ib0', nthreads=num_threads)
    client = Client()

    start_time = datetime.now()
    num_primes = sum(
        client.gather(
            client.map(slow_is_prime,
                       range(args.min_num, args.max_num + 1))
        )
    )
    end_time = datetime.now()

    print(f'{num_primes} primes between {args.min_num} and {args.max_num} '
          f'[{end_time - start_time}]')


if __name__ == '__main__':
    main()

The calls to map and gather are entirely unchanged—Dask tries to separate the “how you write your code” from “how it is parallelised”, so the only thing that needs to change to enable this parallelisation is how the Client is created; in this case the setup is done by dask_mpi, so the call to Client() automatically spans the multiple nodes allocated to this computation.

Adjusting the batch script for the GridSearchCV example above to run this gives:

54 primes between 100000000 and 100001000 [0:00:13.191599]

A 32% improvement on the one-node case, or a 47x speed improvement on the single-core version. This drops the efficiency to 60%, so there is either some difficulty with load balancing, or the overhead of managing multiple nodes is dominating over the time spend performing computations for this task. We could probe this by running for a larger search space.

Key Points

  • Dask will parallelise across as many resources as it is asked to

  • Dask also has drop-in replacements for many other common operations, e.g. in scikit-learn