Multiprocessing with Pathos
Overview
Teaching: 30 min
Exercises: 30 minQuestions
How can I distribute subtasks across multiple cores or nodes?
Objectives
Be able to distribute work to multiple cores using parallel pools
Know where to look to extend this to multiple nodes
Understand how to break up more monolithic tasks into subtasks that can be parallelised
So far, we have distributed processes to multiple cores and nodes using GNU Parallel, but this has required spinning up separate instances of Python and importing the same libraries repeatedly. If the task that we are performing is brief, then this is a large overhead, wasting time that could be spent doing our computation. We’ve also seen that Numpy can distribute some tasks to multiple cores, but isn’t always particularly good at it.
Fortunately, Python includes tools for more explicitly distributing tasks across more than one CPU. Pathos is a tool that extends this to work across multiple nodes, and provides other convenience improvements over Python’s built-in tools.
To start with, we need to install Pathos.
Pathos isn’t installed as part of the standard Anaconda distribution;
it can be installed from the conda-forge
channel though.
$ conda install pathos
A toy example
To get a feel for how Pathos can help run functions in parallel, let’s consider the toy example of raising numbers to powers.
We can run this in an interactive interpreter, via python
or
ipython
:
>>> from pathos.pools import ParallelPool
>>> pool = ParallelPool(nodes=10)
>>> pool.map(pow, [1, 2, 3, 4], [5, 4, 3, 2])
[1, 16, 27, 16]
The key function here is ParallelPool.map()
, which takes the function
provided as the first argument, and calls it repeatedly using the
arguments supplied in the subsequent lists. If you have used map
in
Python, this function is an extension; rather than only taking one list
of arguments, it takes multiple: one per parameter that the function
accepts.
In this case, pow()
has two parameters: the base and the exponent.
Each pair of values is passed to pow
in turn. The ParallelPool
holds a certain number of Python processes running on different
cores, and decides which one is available to run a given set of
parameters. If all processes are busy, then it will wait until one
is free, and then send the parameters over.
Of course, most of the time the function we want to run in parallel isn’t a single mathematical function, but a longer-running function that we have written ourselves.
Getting ready to multiprocess
Before we can ask Pathos to run our code in parallel, we need to
structure it in a way that Pathos can do this easily. This is
a similar process to the one we used for accepting command-line
arguments; the difference is that now instead of using the argparse
module and declaring the arguments that way, we declare a function
that accepts the arguments in question. (It’s good practice to do this
whenever you’re writing a program anyway, and we could have done this
as an extra step when tweaking things for GNU Parallel.)
The example mc.py
that we looked at earlier is already in this shape.
Rather than breaking the existing program to test Pathos, we can write
a second program that loads mc.py
as a library.
from pathos.pools import ParallelPool
from mc import run_mc
def run_in_parallel(betas, ms, h, iteration_count):
# Check that lists are all the same length
assert len(betas) == len(ms)
# Generate lists of the same value of h and iteration_count
hs = [h] * len(betas)
iteration_counts = [iteration_count] * len(betas)
# Generate a list of filenames to output results to
filenames = [f'mc_data/b{beta}_m{m}.dat'
for beta, m in zip(betas, ms)]
# Create a parallel pool to run the functions
pool = ParallelPool(nodes=10)
# Run the work
pool.map(run_mc, ms, hs, betas, iteration_counts, filenames)
if __name__ == '__main__':
run_in_parallel(
[0.5, 0.5, 1.0, 1.0, 1.0, 2.0, 2.0],
[0.5, 1.0, 0.5, 1.0, 2.0, 1.0, 2.0],
1.0,
100000
)
Save this as mc_pathos.py
, and run it via:
$ mkdir mc_data # Make sure that the directory to hold the output data is created
$ python mc_pathos.py
This will run the Monte Carlo simulation for the seven parameter sets
$(\beta=0.5, m=0.5)$, $(\beta=0.5, m=1.0)$, $(\beta=1.0, m=0.5)$,
$(\beta=1.0, m=1.0)$, $(\beta=1.0, m=2.0)$, $(\beta=2.0, m=1.0)$,
$(\beta=2.0, m=2.0)$. Each uses the same values of h
and
iteration_count
, and the filename in each case is decided
programmatically.
Parameter scans
We’ve just used 10 cores to perform 7 independent computations. Even if the computations all take the same length of time, three cores are sitting idle, so we are 70% efficient at best. If we have seven jobs that are long enough that we want to use HPC for them, then we’d be better off using seven independent SLURM jobs; Pathos isn’t useful there. This kind of workflow comes into its own when you have hundreds or thousands of different values to scan through. But hardcoding lists of hundreds or thousands of elements is tedious, especially when there will be lots of repeated values.
Quite often what we’d really like to do is take a set of possible values for each variable, and then scan over all possible combinations. With two variables this allows a heatmap or contour plot to be produced, and with three a volumetric representation could be generated.
The first tool in our arsenal is the product
function from the
itertools
module. This generates tuples containing all possible
combinations of the lists it is given as an argument. Let’s check this
at a Python interpreter:
$ python
>>> from itertools import product
>>> numbers = [1, 2, 3, 4, 5]
>>> letters = ['a', 'b', 'c', 'd', 'e']
>>> list(product(numbers, letters))
[(1, 'a'), (1, 'b'), (1, 'c'), (1, 'd'), (1, 'e'), (2, 'a'), (2, 'b'),
(2, 'c'), (2, 'd'), (2, 'e'), (3, 'a'), (3, 'b'), (3, 'c'), (3, 'd'),
(3, 'e'), (4, 'a'), (4, 'b'), (4, 'c'), (4, 'd'), (4, 'e'), (5, 'a'),
(5, 'b'), (5, 'c'), (5, 'd'), (5, 'e')]
We need to use the list
function here to create a list that we can
read, as by default product
produces a generator. (Great for looping
through, and good for passing to other functions, but not useful for
seeing the results on screen.)
Looking closely, this isn’t quite what we want yet. We need all the first elements to be in one list, and all the second elements to be in another (and so on). Fortunately, there’s another built-in function that can help us here.
zip
is a function that is most often used to take a few lists of
many elements, and return one long list, of tuples of few elements.
But if instead we give it many small tuples, we’ll get back a short
list of long tuples—one containing the first elements, one
containng the second elements, and so on.
The only extra ingredient we need to make this work is to be able to
pass in the elements of the result of product
, rather than the
iterable as a single argument. This is done with the *
, which when
placed before a list (or other iterable) means “expand this iterable
into multiple arguments”. Putting these together:
>>> list(zip(*product(numbers, letters)))
[(1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5,
5, 5), ('a', 'b', 'c', 'd', 'e', 'a', 'b', 'c', 'd', 'e', 'a', 'b',
'c', 'd', 'e', 'a', 'b', 'c', 'd', 'e', 'a', 'b', 'c', 'd', 'e')]
We now have two tuples of the form that we want to pass in to
map
. All that remains is to extract them from the list that they’re
in. Just like before, this is done with *
.
Adapting the previous example to scan $\beta$, $m$, and $h$:
from pathos.pools import ParallelPool
from itertools import product
from mc import run_mc
def run_in_parallel(betas, ms, hs, iteration_count):
# Check that lists are all the same length
assert len(betas) == len(ms)
assert len(betas) == len(hs)
# Generate a list of the same value of iteration_count
iteration_counts = [iteration_count] * len(betas)
# Generate a list of filenames to output results to
filenames = [f'mc_data/b{beta}_m{m}_h{h}.dat'
for beta, m, h in zip(betas, ms, hs)]
# Create a parallel pool to run the functions
pool = ParallelPool(nodes=10)
# Run the work
pool.map(run_mc, ms, hs, betas, iteration_counts, filenames)
if __name__ == '__main__':
betas = [0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0]
ms = [0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0]
hs = [0.25, 0.5, 1.0, 2.5]
run_in_parallel(
*zip(*product(betas, ms, hs)),
10000
)
Saving this as mc_pathos_scan.py
and running it will now generate
324 output files in the mc_data
directory.
Verifying that it is parallel
When Slurm is running a job on a particular node, it will let us SSH directly to that note to check its behaviour. We can use this to verify that we are running in parallel as we expect.
Open a new terminal, and SSH to Sunbird again. Use
squeue -u $USER
to find out what node your job is running on—this is the right-most column. Then SSH into that node, usingssh scsXXXX
, whereXXXX
is replaced with the node number you got fromsqueue
.Once running on the node, you can use the
top
command to get a list of the processes using the most CPU resource, updating every second. If your program is parallelising properly, you should see multiplepython
processes, all consuming somewhere near 100% CPU. (The percentages refer to a single CPU core rather than to the available CPU in the machine as a whole.)If your job (or interactive allocation) ends while you’re SSHed into the node, then the SSH session will be killed by the system automatically.
Processing file lists
Not all the workloads of this kind will be parameter scans; some might be processing large numbers of files instead. Revisit the Fourier example from the section on GNU Parallel, and try converting it to work with Pathos instead.
You might want to do this in the following steps:
- Convert the program to be a function, separating out the behaviour that relates to
argparse
from the behaviour that is the desired computational behaviour.- Create a second file, as we have done here, to call this new function using Pathos. For now, hardcode a list of images to process.
- Now, adjust the program to read the image list from a file, but hardcode the filename.
- Finally, adjust the program to read the image list filename from the commandline, e.g. using
argparse
.
Parameters to scan on the command line
In general, we don’t want to hard-code our parameters, even if there are only a few and we are using
itertools.product
to generate the longer lists.Adapt the Monte Carlo example above so that it can read the
betas
,ms
, andhs
lists from the command-line. (Theaction="append"
keyword argument toadd_argument
will be useful here; theargparse
documentation will tell you more.)
Surprisingly parallelisable tasks
Sometimes you need to think more carefully about how a task can be parallelised. For example, looking back to the example of calculating $\pi$ from the previous section, on the surface we are only performing a single computation, that returns a single number. However, in fact each Monte Carlo sample is independent, so could be generated on a separate processor.
Try adapting one of the solutions to the $\pi$ example in the Numpy episode to run multiple streams in parallel. Each stream will want to report back the count of points inside the circle; totalising and calculating the value of $\pi$ then needs to happen in serial.
Multiple nodes
While it is possible to start processes on more than one node using the Pathos library directly, this is easier to do using Pyina, which is another part of the Pathos framework.
Since Pyina depends on MPI, it’s not available via Conda (as the MPI installation will change from machine to machine). To install Pyina, we first need to choose an MPI library, and then install via Pip. On Sunbird, the first step can be done by loading the appropriate module.
$ # Get the latest version of the Intel MPI library
$ module load mpi/intel/2019/3
$ # Now install Pyina using this MPI library
$ pip install pyina
It can be used very similarly to the Pathos library, by creating a process pool and then using a map function across that pool. The difference is that Pyina will interact with Slurm to correctly position tasks on each node.
A toy example:
from pyina.ez_map import ez_map
# an example function
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
# launch the parallel map of the target function
results = ez_map(host, range(100), nodes = 10)
for result in results:
print(result)
Pyina makes use of the Message Passing Interface (MPI) to communicate between nodes and launch instances of Python; “rank” in this context is an MPI term referring to the index of the process, which is used for bookkeeping. A full study of what MPI can do (in Python or otherwise) is beyond the scope of this lesson!
Key Points
The
ParallelPool
in Pathos can spread work to multiple cores.Look closely at loops and other large operations in your program to identify where there is no data dependency, and so parallelism can be added
Pyina has functions that extend the parallel map to multiple nodes