From 51c0d5e9a4b6c1aea3a3369070509608b05bb0c6 Mon Sep 17 00:00:00 2001 From: Paul McCarthy <pauldmccarthy@gmail.com> Date: Fri, 23 Feb 2018 12:27:45 +0000 Subject: [PATCH] Martin's multiprocessing talk --- talks/multiprocessing/Multiprocessing.ipynb | 1005 +++++++++++++++++++ 1 file changed, 1005 insertions(+) create mode 100644 talks/multiprocessing/Multiprocessing.ipynb diff --git a/talks/multiprocessing/Multiprocessing.ipynb b/talks/multiprocessing/Multiprocessing.ipynb new file mode 100644 index 0000000..f4f637d --- /dev/null +++ b/talks/multiprocessing/Multiprocessing.ipynb @@ -0,0 +1,1005 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "# Multiprocessing and multithreading in Python\n", + "\n", + "## Why use multiprocessing?\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "4" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import multiprocessing\n", + "multiprocessing.cpu_count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "*Almost all CPUs these days are multi-core.*\n", + "\n", + "CPU-intensive programs will not be efficient unless they take advantage of this!\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# General plan\n", + "\n", + "Walk through a basic application of multiprocessing, hopefully relevant to the kind of work you might want to do.\n", + "\n", + "Not a comprehensive guide to Python multithreading/multiprocessing.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## Sample application\n", + "\n", + "Assume we are doing some voxelwise image processing - i.e. running a computationally intensive calculation *independently* on each voxel in a (possibly large) image. \n", + "\n", + "*(Such problems are sometimes called 'embarrassingly parallel')*\n", + "\n", + "This is in a Python module called my_analysis. Here we simulate this by just calculating a large number of exponentials for each voxel.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# my_analysis.py\n", + "\n", + "import math\n", + "import numpy\n", + "\n", + "def calculate_voxel(val):\n", + " # 'Slow' voxelwise calculation\n", + " for i in range(30000):\n", + " b = math.exp(val)\n", + " return b\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We're going to run this on a Numpy array. `numpy.vectorize` is a convenient function to apply a function to every element of the array, but it is *not* doing anything clever - it is no different than looping over the x, y, and z co-ordinates.\n", + "\n", + "We're also giving the data an ID - this will be used later when we have multiple threads." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "def calculate_data(data, id=0):\n", + " # Run 'calculate_voxel' on each voxel in data\n", + " print(\"Id: %i: Processing %i voxels\" % (id, data.size))\n", + " vectorized = numpy.vectorize(calculate_voxel)\n", + " vectorized(data)\n", + " print(\"Id: %i: Done\" % id)\n", + " return data\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here's some Python code to run our analysis on a random Numpy array, and time how long it takes" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Id: 0: Processing 4096 voxels\n", + "Id: 0: Done\n", + "Data processing took 26.44 seconds\n" + ] + } + ], + "source": [ + "import numpy\n", + "import timeit\n", + "\n", + "import my_analysis\n", + "\n", + "def run():\n", + " data = numpy.random.rand(16, 16, 16)\n", + " my_analysis.calculate_data(data)\n", + " \n", + "t = timeit.timeit(run, number=1)\n", + "print(\"Data processing took %.2f seconds\" % t)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "So, it took a little while." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If we watch what's going on while this runs, we can see the program is not using all of our CPU. It's only working on one core.\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## What we want\n", + "\n", + "It would be nice to split the data up into chunks and give one to each core. Then we could get through the processing 8 times as fast. \n", + "\n", + "" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multithreading attempt\n", + "\n", + "*Threads* are a way for a program to run more than one task at a time. Let's try using this on our application, using the Python `threading` module. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Splitting the data up\n", + "\n", + "We're going to need to split the data up into chunks. Numpy has a handy function `numpy.split` which slices the array up into equal portions along a specified axis:\n", + "\n", + " chunks = numpy.split(full_data, num_chunks, axis)\n", + "\n", + "*The data must split up equally along this axis! We will solve this problem later*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating a new thread for each chunk\n", + "\n", + " def function_to_call(args, arg2, arg3):\n", + " ...do something\n", + " \n", + " ...\n", + " \n", + " import threading\n", + " thread = threading.Thread(target=function_to_call, \n", + " args=[arg1, arg2, arg3])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Waiting for the threads to complete\n", + "\n", + " thread.join()\n", + "\n", + "- This waits until `thread` has completed\n", + "- So, if we have more than one thread we need to keep a list and wait for them all to finish:\n", + "\n", + "\n", + " for thread in threads:\n", + " thread.join()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example code\n", + "\n", + "The example code is below - let's see how it does!" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting worker for part 0\n", + "Starting worker for part 1\n", + " Id: 0: Processing 1024 voxels\n", + "Id: 1: Processing 1024 voxels\n", + "Starting worker for part 2Id: 2: Processing 1024 voxels\n", + "\n", + "Starting worker for part 3Id: 3: Processing 1024 voxels\n", + "\n", + "Id: 1: DoneId: 0: Done\n", + "\n", + "Id: 2: Done\n", + "Id: 3: Done\n", + "Data processing took 132.90 seconds\n" + ] + } + ], + "source": [ + "import threading\n", + "\n", + "def multithread_process(data):\n", + " n_workers = 4\n", + " \n", + " # Split the data into chunks along axis 0\n", + " # We are assuming this axis is divisible by the number of workers!\n", + " chunks = numpy.split(data, n_workers, axis=0)\n", + " \n", + " # Start a worker for each chunk\n", + " workers = []\n", + " for idx, chunk in enumerate(chunks):\n", + " print(\"Starting worker for part %i\" % idx)\n", + " w = threading.Thread(target=my_analysis.calculate_data, args=[chunk, idx])\n", + " workers.append(w)\n", + " w.start()\n", + " \n", + " # Wait for each worker to finish\n", + " for w in workers:\n", + " w.join()\n", + " \n", + "def run_with_threads():\n", + " data = numpy.random.rand(16, 16, 16)\n", + " multithread_process(data)\n", + " \n", + "t = timeit.timeit(run_with_threads, number=1)\n", + "print(\"Data processing took %.2f seconds\" % t)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# The Big Problem with Python threads" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Only one thread can execute Python code at a time**\n", + "\n", + "\n", + "\n", + "This is what's really going on.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The reason is something called the **Global Interpreter Lock (GIL)**. Only one thread can have it, and you can only execute Python code when you have the GIL." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## So, does that mean Python threads are useless?\n", + "\n", + "No, not completely. They're useful for:\n", + "\n", + "- Making a user interface continue to respond while a calculation takes place in the background\n", + "- A web server handling multiple requests.\n", + " - *The GIL is not required while waiting for network connections*\n", + "- Doing calculations in parallel which are running in native (C/C++) code\n", + " - *The GIL is not required while running native code*\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "### But for doing CPU-intensive Python calculations in parallel, yes Python threads are essentially useless\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Can multiprocessing help?" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Differences between threads and processes\n", + "\n", + "- Threads are quicker to start up and generally require fewer resources\n", + "\n", + "- Threads share memory with the main process \n", + " - Don't need to copy your data to pass it to a thread\n", + " - Don't need to copy the output data back to the main program\n", + " \n", + "- Processes have their own memory space \n", + " - Data needs to be copied from the main program to the process\n", + " - Any output needs to be copied back\n", + " \n", + "- However, importantly for Python, *Each process has its own GIL so they can run at the same time as others*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Multiprocessing attempt\n", + "\n", + "Multiprocessing is normally more work than multithreading.\n", + "\n", + "However Python tries *very hard* to make multiprocessing as easy as multithreading.\n", + "\n", + "- `import multiprocessing` instead of `import threading`\n", + "- `multiprocessing.Process()` instead of `threading.Thread()`" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting worker for chunk 0\n", + "Starting worker for chunk 1\n", + "Starting worker for chunk 2\n", + "Starting worker for chunk 3\n", + "Data processing took 9.74 seconds\n" + ] + } + ], + "source": [ + "import multiprocessing\n", + " \n", + "def multiprocess_process(data):\n", + " n_workers = 4\n", + " \n", + " # Split the data into chunks along axis 0\n", + " # We are assuming this axis is divisible by the number of workers!\n", + " chunks = numpy.split(data, n_workers, axis=0)\n", + " \n", + " workers = []\n", + " for idx, chunk in enumerate(chunks):\n", + " print(\"Starting worker for chunk %i\" % idx)\n", + " w = multiprocessing.Process(target=my_analysis.calculate_data, args=[chunk, idx])\n", + " workers.append(w)\n", + " w.start()\n", + " \n", + " # Wait for workers to complete\n", + " for w in workers:\n", + " w.join()\n", + " \n", + "def run_with_processes():\n", + " data = numpy.random.rand(16, 16, 16)\n", + " multiprocess_process(data)\n", + "\n", + "if __name__ == \"__main__\":\n", + " t = timeit.timeit(run_with_processes, number=1)\n", + " print(\"Data processing took %.2f seconds\" % t)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multiprocessing works!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## BUT\n", + "\n", + "# Caveats and gotchas\n", + "\n", + "Before we just run off and replace all our threads with processes there are a few things we need to bear in mind:\n", + "\n", + "## Data copying\n", + "\n", + "- Python *copied* each chunk of data to each worker. If the data was very large this could be a significant overhead\n", + "- Python needs to know *how* to copy all the data we pass to the process. \n", + " - This is fine for normal data types (strings, lists, dictionaries, etc) and Numpy arrays\n", + " - Can get trouble if you try to pass complex objects to your function \n", + " - Anything you pass to the worker needs to be support the `pickle` module\n", + "\n", + "## The global variable problem\n", + "\n", + "- Can't rely on global variables being copied\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The output problem\n", + "\n", + "If you change data in a subprocess, your main program will not see it.\n", + "\n", + "## Example:\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# my_analysis.py\n", + "def add_one(data):\n", + " data += 1\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting with zeros\n", + "[[ 0. 0.]\n", + " [ 0. 0.]]\n", + "I think my worker just added one\n", + "[[ 0. 0.]\n", + " [ 0. 0.]]\n" + ] + } + ], + "source": [ + "data = numpy.zeros([2, 2])\n", + "print(\"Starting with zeros\")\n", + "print(data)\n", + "\n", + "#my_analysis.add_one(data)\n", + "#w = threading.Thread(target=my_analysis.add_one, args=[data,])\n", + "w = multiprocessing.Process(target=my_analysis.add_one, args=[data,])\n", + "w.start()\n", + "w.join()\n", + "\n", + "print(\"I think my worker just added one\")\n", + "print(data)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": true + }, + "source": [ + "# Making multiprocessing work better" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Problems to solve:\n", + "\n", + "- Dividing data amongst processes\n", + "- Returning data from process\n", + "- Status updates from process\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Worker pools\n", + "\n", + "A *Pool* is a fixed number of worker processes which can be given tasks to do. \n", + "\n", + "We can give a pool as many tasks as we like - once a worker finishes a task it will start another, until they're all done.\n", + "\n", + "We can create a pool using:\n", + "\n", + " multiprocessing.Pool(num_workers)\n", + " \n", + "- If the number of workers in the pool is equal to the number of cores, we should be able to keep our CPU busy.\n", + "- Pools are good for load balancing if some slices are more work than others\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Splitting our data into chunks for the pool\n", + "\n", + " - Now we can split our data up into as many chunks as we like\n", + " - Easiest solution is to use 1-voxel slices along one of the axes `split_axis`:\n", + " - `numpy.split(data, data.shape[split_axis], axis=split_axis)`\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Giving the tasks to the pool\n", + "\n", + " - Easiest way is to use:\n", + " \n", + " \n", + " Pool.map(function, task_args)\n", + " \n", + " - task_args is a *sequence of sequences*\n", + " - Each element in `task_args` is a sequence of arguments for one task\n", + " - The length of `task_args` is the number of tasks\n", + " \n", + "#### Example `task_args` for 5 tasks, each being passed an ID and a chunk of data\n", + "\n", + " [ \n", + " [0, chunk_1], # task 1\n", + " [1, chunk_2], # task_2\n", + " [2, chunk_3], # task 3\n", + " [3, chunk_4], # task_4\n", + " [4, chunk_5], # task_5\n", + " ]\n", + " \n", + " - If we have a list of chunks we can generate this with `enumerate(chunks)`\n", + " \n", + " \n", + " - **Arguments are passed to the task in a slightly different way compared to `multiprocessing.Process()`** \n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# my_analysis.py\n", + "\n", + "def do_task(args):\n", + " # Pool.map passes all our arguments as a single tuple, so unpack it\n", + " # and pass the arguments to the real calculate function.\n", + " id, data = args\n", + " return calculate_data(data, id)\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + " - If you're using Python 3, look into `Pool.starmap`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Getting the output and putting it back together\n", + "\n", + " - `Pool.map()` captures the return value of your worker function\n", + " - It returns a list of all of the return values for each task \n", + " - for us this is a list of Numpy arrays, one for each slice\n", + " - `numpy.concatenate(list_of_slices, split_axis)` will combine them back into a single data item for us\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The full example" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Input data shape=(23L, 13L, 11L)\n", + "Processing 23 parts with 4 workers\n", + "Processed data, output shape=(23L, 13L, 11L)\n", + "Data processing took 8.08 seconds\n" + ] + } + ], + "source": [ + "# Split our data along the x-axis\n", + "SPLIT_AXIS = 0\n", + "\n", + "def pool_process(data):\n", + " n_workers = 4\n", + " \n", + " # Split the data into 1-voxel slices along axis 0\n", + " parts = numpy.split(data, data.shape[SPLIT_AXIS], axis=SPLIT_AXIS)\n", + "\n", + " print(\"Input data shape=%s\" % str(data.shape))\n", + " print(\"Processing %i parts with %i workers\" % (len(parts), n_workers))\n", + " \n", + " # Create a pool - normally this would be 1 worker per CPU core\n", + " pool = multiprocessing.Pool(n_workers)\n", + " \n", + " # Send the tasks to the workers\n", + " list_of_slices = pool.map(my_analysis.do_task, enumerate(parts))\n", + " \n", + " # Combine the return data back into a single array\n", + " processed_array = numpy.concatenate(list_of_slices, SPLIT_AXIS)\n", + " \n", + " print(\"Processed data, output shape=%s\" % str(processed_array.shape))\n", + " \n", + "def run_with_pool():\n", + " data = numpy.random.rand(23, 13, 11)\n", + " pool_process(data)\n", + " \n", + "t = timeit.timeit(run_with_pool, number=1)\n", + "print(\"Data processing took %.2f seconds\" % t)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Communication / Status updates?\n", + "\n", + " - Would be nice if workers could communicate their progress as they work. One way to do this is using a `Queue`.\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "## Queues\n", + "\n", + "A Queue is often used to send status updates from the process to the main program.\n", + "\n", + " - Shared between the main program and the subprocesses\n", + " - Create it with `multiprocessing.Manager().Queue()`\n", + " - Pass it to the worker thread like any other argument\n", + " - Worker calls `queue.put()` to send some data to the queue\n", + " - Main program calls `queue.get()` to get data off the queue\n", + " - Queue is FIFO (First In First Out)\n", + " - Need to have a thread running which checks the queue for updates every so often\n", + " - This is a good use for threads!\n", + " \n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Modify our example to report progress" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# my_analysis.py \n", + "\n", + "def calculate_data_and_report(args):\n", + " id, data, queue = args\n", + " \n", + " # Run 'calculate_voxel' on each voxel in data\n", + " vectorized = numpy.vectorize(calculate_voxel)\n", + " vectorized(data)\n", + " \n", + " # Report our ID and how many voxels we have done to the queue\n", + " queue.put((id, data.size))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create a thread to monitor the queue for updates\n", + "\n", + "I've done this as a class, because it that's the easiest way" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "class QueueChecker():\n", + " def __init__(self, queue, num_voxels, interval_seconds=1):\n", + " self._queue = queue\n", + " self._num_voxels = num_voxels\n", + " self._interval_seconds = interval_seconds\n", + " self._voxels_done = 0\n", + " self._cancel = False\n", + " self._restart_timer()\n", + " \n", + " def cancel(self):\n", + " self._cancel = True\n", + " \n", + " def _restart_timer(self):\n", + " self._timer = threading.Timer(self._interval_seconds, self._check_queue)\n", + " self._timer.start()\n", + "\n", + " def _check_queue(self):\n", + " while not self._queue.empty():\n", + " id, voxels_done = self._queue.get()\n", + " self._voxels_done += voxels_done\n", + " \n", + " percent = int(100*float(self._voxels_done)/self._num_voxels)\n", + " print(\"%i%% complete\" % percent)\n", + " if not self._cancel:\n", + " self._restart_timer()\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Modify our main program to pass the queue to each of our workers\n", + "\n", + "We need to create the queue and the `QueueChecker` and make sure each task includes a copy of the queue\n" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Input data shape=(23L, 19L, 17L)\n", + "We are processing 23 parts with 4 workers\n", + "0% complete\n", + "0% complete\n", + "13% complete\n", + "17% complete\n", + "17% complete\n", + "34% complete\n", + "34% complete\n", + "47% complete\n", + "52% complete\n", + "56% complete\n", + "69% complete\n", + "69% complete\n", + "78% complete\n", + "86% complete\n", + "95% complete\n", + "Processed data, output shape=(23L, 19L, 17L)\n", + "Data processing took 17.39 seconds\n", + "100% complete\n" + ] + } + ], + "source": [ + "# Split our data along the x-axis\n", + "SPLIT_AXIS = 0\n", + "reload(my_analysis)\n", + "\n", + "def pool_process(data):\n", + " n_workers = 4\n", + " \n", + " # Split the data into 1-voxel slices along axis 0\n", + " parts = numpy.split(data, data.shape[SPLIT_AXIS], axis=SPLIT_AXIS)\n", + "\n", + " print(\"Input data shape=%s\" % str(data.shape))\n", + " print(\"We are processing %i parts with %i workers\" % (len(parts), n_workers))\n", + " pool = multiprocessing.Pool(n_workers)\n", + " \n", + " # Create the queue\n", + " queue = multiprocessing.Manager().Queue()\n", + " checker = QueueChecker(queue, data.size)\n", + " \n", + " # Note that we need to pass the queue as an argument to the worker\n", + " args = [(id, part, queue) for id, part in enumerate(parts)]\n", + " list_of_slices = pool.map(my_analysis.calculate_data_and_report, args)\n", + " \n", + " checker.cancel()\n", + " \n", + " # Join processed data back together again\n", + " processed_array = numpy.concatenate(list_of_slices, SPLIT_AXIS)\n", + " print(\"Processed data, output shape=%s\" % str(processed_array.shape))\n", + " \n", + "def run_with_pool():\n", + " data = numpy.random.rand(23, 19, 17)\n", + " pool_process(data)\n", + "\n", + "t = timeit.timeit(run_with_pool, number=1)\n", + "print(\"Data processing took %.2f seconds\" % t)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Summary" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## What we've covered\n", + "\n", + " - Limitations of threading for parallel processing in Python\n", + " - How to split up a simple voxel-processing task into separate chunks\n", + " - `numpy.split()`\n", + " - How to run each chunk in parallel using multiprocessing\n", + " - `multiprocessing.Process`\n", + " - How to separate the number of tasks from the number of workers \n", + " - `multiprocessing.Pool()`\n", + " - `Pool.map()`\n", + " - How to get output back from the workers and join it back together again\n", + " - `numpy.concatenate()`\n", + " - How to pass back progress information from our worker processes\n", + " - `multiprocessing.manager.Queue()`\n", + " - Using a threading.Timer object to monitor the queue and display updates" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Things I haven't covered\n", + "\n", + "Loads of stuff!\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Threading\n", + "\n", + "- Locking of shared data (so only one thread can use it at a time)\n", + "- Thread-local storage (see `threading.local()`)\n", + "- See Paul's tutorial on the PyTreat GIT for more information\n", + "- Or see the `threading` Python documentation for full details" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Multiprocessing\n", + "\n", + "- Passing data *between* workers\n", + " - Can use `Queue` for one-way traffic\n", + " - Use `Pipe` for two-way communication between one worker and another\n", + " - May be required when your problem is not 'embarrasingly parallel'\n", + "- Sharing memory\n", + " - Way to avoid copying large amounts of data\n", + " - Look at `multiprocessing.Array`\n", + " - Need to convert Numpy array into a ctypes array\n", + " - Shared memory has pitfalls\n", + " - *Don't go here unless you have aready determined that data copying is a bottleneck*\n", + "- Running workers asynchronously\n", + " - So main program doesn't have to wait for them to finish\n", + " - Instead, a function is called every time a task is finished\n", + " - see `multiprocessing.apply_async()` for more information\n", + "- Error handling\n", + " - Needs a bit of care - very easy to 'lose' errors\n", + " - Workers should catch all exceptions\n", + " - And should return a value to signal when a task has failed\n", + " - Main program decides how to deal with it\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": true + }, + "source": [ + "## Always remember\n", + "\n", + "**Python is not the best tool for every job!**\n", + "\n", + "If you are really after performance, consider implementing your algorithm in multi-threaded C/C++ and then create a Python interface.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 2", + "language": "python", + "name": "python2" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.14" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} -- GitLab