Skip to content
Snippets Groups Projects
Commit 51c0d5e9 authored by Paul McCarthy's avatar Paul McCarthy :mountain_bicyclist:
Browse files

Martin's multiprocessing talk

parent 30b0a915
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags:
![Running in parallel](parallel.png)
# Multiprocessing and multithreading in Python
## Why use multiprocessing?
%% Cell type:code id: tags:
``` python
import multiprocessing
multiprocessing.cpu_count()
```
%% Output
4
%% Cell type:markdown id: tags:
*Almost all CPUs these days are multi-core.*
CPU-intensive programs will not be efficient unless they take advantage of this!
%% Cell type:markdown id: tags:
# General plan
Walk through a basic application of multiprocessing, hopefully relevant to the kind of work you might want to do.
Not a comprehensive guide to Python multithreading/multiprocessing.
%% Cell type:markdown id: tags:
![voxel](voxel.png)
## Sample application
Assume we are doing some voxelwise image processing - i.e. running a computationally intensive calculation *independently* on each voxel in a (possibly large) image.
*(Such problems are sometimes called 'embarrassingly parallel')*
This is in a Python module called my_analysis. Here we simulate this by just calculating a large number of exponentials for each voxel.
%% Cell type:code id: tags:
``` python
# my_analysis.py
import math
import numpy
def calculate_voxel(val):
# 'Slow' voxelwise calculation
for i in range(30000):
b = math.exp(val)
return b
```
%% Cell type:markdown id: tags:
We're going to run this on a Numpy array. `numpy.vectorize` is a convenient function to apply a function to every element of the array, but it is *not* doing anything clever - it is no different than looping over the x, y, and z co-ordinates.
We're also giving the data an ID - this will be used later when we have multiple threads.
%% Cell type:code id: tags:
``` python
def calculate_data(data, id=0):
# Run 'calculate_voxel' on each voxel in data
print("Id: %i: Processing %i voxels" % (id, data.size))
vectorized = numpy.vectorize(calculate_voxel)
vectorized(data)
print("Id: %i: Done" % id)
return data
```
%% Cell type:markdown id: tags:
Here's some Python code to run our analysis on a random Numpy array, and time how long it takes
%% Cell type:code id: tags:
``` python
import numpy
import timeit
import my_analysis
def run():
data = numpy.random.rand(16, 16, 16)
my_analysis.calculate_data(data)
t = timeit.timeit(run, number=1)
print("Data processing took %.2f seconds" % t)
```
%% Output
Id: 0: Processing 4096 voxels
Id: 0: Done
Data processing took 26.44 seconds
%% Cell type:markdown id: tags:
So, it took a little while.
%% Cell type:markdown id: tags:
If we watch what's going on while this runs, we can see the program is not using all of our CPU. It's only working on one core.
![Running in serial](onecore.png)
%% Cell type:markdown id: tags:
## What we want
It would be nice to split the data up into chunks and give one to each core. Then we could get through the processing 8 times as fast.
![Running in parallel](multicore.png)
%% Cell type:markdown id: tags:
# Multithreading attempt
*Threads* are a way for a program to run more than one task at a time. Let's try using this on our application, using the Python `threading` module.
%% Cell type:markdown id: tags:
## Splitting the data up
We're going to need to split the data up into chunks. Numpy has a handy function `numpy.split` which slices the array up into equal portions along a specified axis:
chunks = numpy.split(full_data, num_chunks, axis)
*The data must split up equally along this axis! We will solve this problem later*
%% Cell type:markdown id: tags:
## Creating a new thread for each chunk
def function_to_call(args, arg2, arg3):
...do something
...
import threading
thread = threading.Thread(target=function_to_call,
args=[arg1, arg2, arg3])
%% Cell type:markdown id: tags:
## Waiting for the threads to complete
thread.join()
- This waits until `thread` has completed
- So, if we have more than one thread we need to keep a list and wait for them all to finish:
for thread in threads:
thread.join()
%% Cell type:markdown id: tags:
## Example code
The example code is below - let's see how it does!
%% Cell type:code id: tags:
``` python
import threading
def multithread_process(data):
n_workers = 4
# Split the data into chunks along axis 0
# We are assuming this axis is divisible by the number of workers!
chunks = numpy.split(data, n_workers, axis=0)
# Start a worker for each chunk
workers = []
for idx, chunk in enumerate(chunks):
print("Starting worker for part %i" % idx)
w = threading.Thread(target=my_analysis.calculate_data, args=[chunk, idx])
workers.append(w)
w.start()
# Wait for each worker to finish
for w in workers:
w.join()
def run_with_threads():
data = numpy.random.rand(16, 16, 16)
multithread_process(data)
t = timeit.timeit(run_with_threads, number=1)
print("Data processing took %.2f seconds" % t)
```
%% Output
Starting worker for part 0
Starting worker for part 1
Id: 0: Processing 1024 voxels
Id: 1: Processing 1024 voxels
Starting worker for part 2Id: 2: Processing 1024 voxels
Starting worker for part 3Id: 3: Processing 1024 voxels
Id: 1: DoneId: 0: Done
Id: 2: Done
Id: 3: Done
Data processing took 132.90 seconds
%% Cell type:markdown id: tags:
# The Big Problem with Python threads
%% Cell type:markdown id: tags:
**Only one thread can execute Python code at a time**
![Python multithreading](thread_gil.png)
This is what's really going on.
%% Cell type:markdown id: tags:
The reason is something called the **Global Interpreter Lock (GIL)**. Only one thread can have it, and you can only execute Python code when you have the GIL.
%% Cell type:markdown id: tags:
## So, does that mean Python threads are useless?
No, not completely. They're useful for:
- Making a user interface continue to respond while a calculation takes place in the background
- A web server handling multiple requests.
- *The GIL is not required while waiting for network connections*
- Doing calculations in parallel which are running in native (C/C++) code
- *The GIL is not required while running native code*
%% Cell type:markdown id: tags:
### But for doing CPU-intensive Python calculations in parallel, yes Python threads are essentially useless
%% Cell type:markdown id: tags:
## Can multiprocessing help?
%% Cell type:markdown id: tags:
### Differences between threads and processes
- Threads are quicker to start up and generally require fewer resources
- Threads share memory with the main process
- Don't need to copy your data to pass it to a thread
- Don't need to copy the output data back to the main program
- Processes have their own memory space
- Data needs to be copied from the main program to the process
- Any output needs to be copied back
- However, importantly for Python, *Each process has its own GIL so they can run at the same time as others*
%% Cell type:markdown id: tags:
## Multiprocessing attempt
Multiprocessing is normally more work than multithreading.
However Python tries *very hard* to make multiprocessing as easy as multithreading.
- `import multiprocessing` instead of `import threading`
- `multiprocessing.Process()` instead of `threading.Thread()`
%% Cell type:code id: tags:
``` python
import multiprocessing
def multiprocess_process(data):
n_workers = 4
# Split the data into chunks along axis 0
# We are assuming this axis is divisible by the number of workers!
chunks = numpy.split(data, n_workers, axis=0)
workers = []
for idx, chunk in enumerate(chunks):
print("Starting worker for chunk %i" % idx)
w = multiprocessing.Process(target=my_analysis.calculate_data, args=[chunk, idx])
workers.append(w)
w.start()
# Wait for workers to complete
for w in workers:
w.join()
def run_with_processes():
data = numpy.random.rand(16, 16, 16)
multiprocess_process(data)
if __name__ == "__main__":
t = timeit.timeit(run_with_processes, number=1)
print("Data processing took %.2f seconds" % t)
```
%% Output
Starting worker for chunk 0
Starting worker for chunk 1
Starting worker for chunk 2
Starting worker for chunk 3
Data processing took 9.74 seconds
%% Cell type:markdown id: tags:
# Multiprocessing works!
%% Cell type:markdown id: tags:
## BUT
# Caveats and gotchas
Before we just run off and replace all our threads with processes there are a few things we need to bear in mind:
## Data copying
- Python *copied* each chunk of data to each worker. If the data was very large this could be a significant overhead
- Python needs to know *how* to copy all the data we pass to the process.
- This is fine for normal data types (strings, lists, dictionaries, etc) and Numpy arrays
- Can get trouble if you try to pass complex objects to your function
- Anything you pass to the worker needs to be support the `pickle` module
## The global variable problem
- Can't rely on global variables being copied
%% Cell type:markdown id: tags:
## The output problem
If you change data in a subprocess, your main program will not see it.
## Example:
%% Cell type:code id: tags:
``` python
# my_analysis.py
def add_one(data):
data += 1
```
%% Cell type:code id: tags:
``` python
data = numpy.zeros([2, 2])
print("Starting with zeros")
print(data)
#my_analysis.add_one(data)
#w = threading.Thread(target=my_analysis.add_one, args=[data,])
w = multiprocessing.Process(target=my_analysis.add_one, args=[data,])
w.start()
w.join()
print("I think my worker just added one")
print(data)
```
%% Output
Starting with zeros
[[ 0. 0.]
[ 0. 0.]]
I think my worker just added one
[[ 0. 0.]
[ 0. 0.]]
%% Cell type:markdown id: tags:
# Making multiprocessing work better
%% Cell type:markdown id: tags:
## Problems to solve:
- Dividing data amongst processes
- Returning data from process
- Status updates from process
%% Cell type:markdown id: tags:
## Worker pools
A *Pool* is a fixed number of worker processes which can be given tasks to do.
We can give a pool as many tasks as we like - once a worker finishes a task it will start another, until they're all done.
We can create a pool using:
multiprocessing.Pool(num_workers)
- If the number of workers in the pool is equal to the number of cores, we should be able to keep our CPU busy.
- Pools are good for load balancing if some slices are more work than others
%% Cell type:markdown id: tags:
![Worker pool](pool.png)
%% Cell type:markdown id: tags:
## Splitting our data into chunks for the pool
- Now we can split our data up into as many chunks as we like
- Easiest solution is to use 1-voxel slices along one of the axes `split_axis`:
- `numpy.split(data, data.shape[split_axis], axis=split_axis)`
%% Cell type:markdown id: tags:
## Giving the tasks to the pool
- Easiest way is to use:
Pool.map(function, task_args)
- task_args is a *sequence of sequences*
- Each element in `task_args` is a sequence of arguments for one task
- The length of `task_args` is the number of tasks
#### Example `task_args` for 5 tasks, each being passed an ID and a chunk of data
[
[0, chunk_1], # task 1
[1, chunk_2], # task_2
[2, chunk_3], # task 3
[3, chunk_4], # task_4
[4, chunk_5], # task_5
]
- If we have a list of chunks we can generate this with `enumerate(chunks)`
- **Arguments are passed to the task in a slightly different way compared to `multiprocessing.Process()`**
%% Cell type:code id: tags:
``` python
# my_analysis.py
def do_task(args):
# Pool.map passes all our arguments as a single tuple, so unpack it
# and pass the arguments to the real calculate function.
id, data = args
return calculate_data(data, id)
```
%% Cell type:markdown id: tags:
- If you're using Python 3, look into `Pool.starmap`
%% Cell type:markdown id: tags:
## Getting the output and putting it back together
- `Pool.map()` captures the return value of your worker function
- It returns a list of all of the return values for each task
- for us this is a list of Numpy arrays, one for each slice
- `numpy.concatenate(list_of_slices, split_axis)` will combine them back into a single data item for us
%% Cell type:markdown id: tags:
## The full example
%% Cell type:code id: tags:
``` python
# Split our data along the x-axis
SPLIT_AXIS = 0
def pool_process(data):
n_workers = 4
# Split the data into 1-voxel slices along axis 0
parts = numpy.split(data, data.shape[SPLIT_AXIS], axis=SPLIT_AXIS)
print("Input data shape=%s" % str(data.shape))
print("Processing %i parts with %i workers" % (len(parts), n_workers))
# Create a pool - normally this would be 1 worker per CPU core
pool = multiprocessing.Pool(n_workers)
# Send the tasks to the workers
list_of_slices = pool.map(my_analysis.do_task, enumerate(parts))
# Combine the return data back into a single array
processed_array = numpy.concatenate(list_of_slices, SPLIT_AXIS)
print("Processed data, output shape=%s" % str(processed_array.shape))
def run_with_pool():
data = numpy.random.rand(23, 13, 11)
pool_process(data)
t = timeit.timeit(run_with_pool, number=1)
print("Data processing took %.2f seconds" % t)
```
%% Output
Input data shape=(23L, 13L, 11L)
Processing 23 parts with 4 workers
Processed data, output shape=(23L, 13L, 11L)
Data processing took 8.08 seconds
%% Cell type:markdown id: tags:
# Communication / Status updates?
- Would be nice if workers could communicate their progress as they work. One way to do this is using a `Queue`.
%% Cell type:markdown id: tags:
## Queues
A Queue is often used to send status updates from the process to the main program.
- Shared between the main program and the subprocesses
- Create it with `multiprocessing.Manager().Queue()`
- Pass it to the worker thread like any other argument
- Worker calls `queue.put()` to send some data to the queue
- Main program calls `queue.get()` to get data off the queue
- Queue is FIFO (First In First Out)
- Need to have a thread running which checks the queue for updates every so often
- This is a good use for threads!
![Queue](queue_put.png)
%% Cell type:markdown id: tags:
## Modify our example to report progress
%% Cell type:code id: tags:
``` python
# my_analysis.py
def calculate_data_and_report(args):
id, data, queue = args
# Run 'calculate_voxel' on each voxel in data
vectorized = numpy.vectorize(calculate_voxel)
vectorized(data)
# Report our ID and how many voxels we have done to the queue
queue.put((id, data.size))
```
%% Cell type:markdown id: tags:
## Create a thread to monitor the queue for updates
I've done this as a class, because it that's the easiest way
%% Cell type:code id: tags:
``` python
class QueueChecker():
def __init__(self, queue, num_voxels, interval_seconds=1):
self._queue = queue
self._num_voxels = num_voxels
self._interval_seconds = interval_seconds
self._voxels_done = 0
self._cancel = False
self._restart_timer()
def cancel(self):
self._cancel = True
def _restart_timer(self):
self._timer = threading.Timer(self._interval_seconds, self._check_queue)
self._timer.start()
def _check_queue(self):
while not self._queue.empty():
id, voxels_done = self._queue.get()
self._voxels_done += voxels_done
percent = int(100*float(self._voxels_done)/self._num_voxels)
print("%i%% complete" % percent)
if not self._cancel:
self._restart_timer()
```
%% Cell type:markdown id: tags:
## Modify our main program to pass the queue to each of our workers
We need to create the queue and the `QueueChecker` and make sure each task includes a copy of the queue
%% Cell type:code id: tags:
``` python
# Split our data along the x-axis
SPLIT_AXIS = 0
reload(my_analysis)
def pool_process(data):
n_workers = 4
# Split the data into 1-voxel slices along axis 0
parts = numpy.split(data, data.shape[SPLIT_AXIS], axis=SPLIT_AXIS)
print("Input data shape=%s" % str(data.shape))
print("We are processing %i parts with %i workers" % (len(parts), n_workers))
pool = multiprocessing.Pool(n_workers)
# Create the queue
queue = multiprocessing.Manager().Queue()
checker = QueueChecker(queue, data.size)
# Note that we need to pass the queue as an argument to the worker
args = [(id, part, queue) for id, part in enumerate(parts)]
list_of_slices = pool.map(my_analysis.calculate_data_and_report, args)
checker.cancel()
# Join processed data back together again
processed_array = numpy.concatenate(list_of_slices, SPLIT_AXIS)
print("Processed data, output shape=%s" % str(processed_array.shape))
def run_with_pool():
data = numpy.random.rand(23, 19, 17)
pool_process(data)
t = timeit.timeit(run_with_pool, number=1)
print("Data processing took %.2f seconds" % t)
```
%% Output
Input data shape=(23L, 19L, 17L)
We are processing 23 parts with 4 workers
0% complete
0% complete
13% complete
17% complete
17% complete
34% complete
34% complete
47% complete
52% complete
56% complete
69% complete
69% complete
78% complete
86% complete
95% complete
Processed data, output shape=(23L, 19L, 17L)
Data processing took 17.39 seconds
100% complete
%% Cell type:markdown id: tags:
# Summary
%% Cell type:markdown id: tags:
## What we've covered
- Limitations of threading for parallel processing in Python
- How to split up a simple voxel-processing task into separate chunks
- `numpy.split()`
- How to run each chunk in parallel using multiprocessing
- `multiprocessing.Process`
- How to separate the number of tasks from the number of workers
- `multiprocessing.Pool()`
- `Pool.map()`
- How to get output back from the workers and join it back together again
- `numpy.concatenate()`
- How to pass back progress information from our worker processes
- `multiprocessing.manager.Queue()`
- Using a threading.Timer object to monitor the queue and display updates
%% Cell type:markdown id: tags:
## Things I haven't covered
Loads of stuff!
%% Cell type:markdown id: tags:
### Threading
- Locking of shared data (so only one thread can use it at a time)
- Thread-local storage (see `threading.local()`)
- See Paul's tutorial on the PyTreat GIT for more information
- Or see the `threading` Python documentation for full details
%% Cell type:markdown id: tags:
### Multiprocessing
- Passing data *between* workers
- Can use `Queue` for one-way traffic
- Use `Pipe` for two-way communication between one worker and another
- May be required when your problem is not 'embarrasingly parallel'
- Sharing memory
- Way to avoid copying large amounts of data
- Look at `multiprocessing.Array`
- Need to convert Numpy array into a ctypes array
- Shared memory has pitfalls
- *Don't go here unless you have aready determined that data copying is a bottleneck*
- Running workers asynchronously
- So main program doesn't have to wait for them to finish
- Instead, a function is called every time a task is finished
- see `multiprocessing.apply_async()` for more information
- Error handling
- Needs a bit of care - very easy to 'lose' errors
- Workers should catch all exceptions
- And should return a value to signal when a task has failed
- Main program decides how to deal with it
%% Cell type:markdown id: tags:
## Always remember
**Python is not the best tool for every job!**
If you are really after performance, consider implementing your algorithm in multi-threaded C/C++ and then create a Python interface.
%% Cell type:code id: tags:
``` python
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment