{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Running in parallel](parallel.png)\n",
    "\n",
    "# Multiprocessing and multithreading in Python\n",
    "\n",
    "## Why use multiprocessing?\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "4"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import multiprocessing\n",
    "multiprocessing.cpu_count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "*Almost all CPUs these days are multi-core.*\n",
    "\n",
    "CPU-intensive programs will not be efficient unless they take advantage of this!\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# General plan\n",
    "\n",
    "Walk through a basic application of multiprocessing, hopefully relevant to the kind of work you might want to do.\n",
    "\n",
    "Not a comprehensive guide to Python multithreading/multiprocessing.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![voxel](voxel.png)\n",
    "\n",
    "## Sample application\n",
    "\n",
    "Assume we are doing some voxelwise image processing - i.e. running a computationally intensive calculation *independently* on each voxel in a (possibly large) image. \n",
    "\n",
    "*(Such problems are sometimes called 'embarrassingly parallel')*\n",
    "\n",
    "This is in a Python module called my_analysis. Here we simulate this by just calculating a large number of exponentials for each voxel.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# my_analysis.py\n",
    "\n",
    "import math\n",
    "import numpy\n",
    "\n",
    "def calculate_voxel(val):\n",
    "    # 'Slow' voxelwise calculation\n",
    "    for i in range(30000):\n",
    "        b = math.exp(val)\n",
    "    return b\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We're going to run this on a Numpy array. `numpy.vectorize` is a convenient function to apply a function to every element of the array, but it is *not* doing anything clever - it is no different than looping over the x, y, and z co-ordinates.\n",
    "\n",
    "We're also giving the data an ID - this will be used later when we have multiple threads."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def calculate_data(data, id=0):\n",
    "    # Run 'calculate_voxel' on each voxel in data\n",
    "    print(\"Id: %i: Processing %i voxels\" % (id, data.size))\n",
    "    vectorized = numpy.vectorize(calculate_voxel)\n",
    "    vectorized(data)\n",
    "    print(\"Id: %i: Done\" % id)\n",
    "    return data\n",
    " "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here's some Python code to run our analysis on a random Numpy array, and time how long it takes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Id: 0: Processing 4096 voxels\n",
      "Id: 0: Done\n",
      "Data processing took 26.44 seconds\n"
     ]
    }
   ],
   "source": [
    "import numpy\n",
    "import timeit\n",
    "\n",
    "import my_analysis\n",
    "\n",
    "def run():\n",
    "    data = numpy.random.rand(16, 16, 16)\n",
    "    my_analysis.calculate_data(data)\n",
    "    \n",
    "t = timeit.timeit(run, number=1)\n",
    "print(\"Data processing took %.2f seconds\" % t)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So, it took a little while."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If we watch what's going on while this runs, we can see the program is not using all of our CPU. It's only working on one core.\n",
    "\n",
    "![Running in serial](onecore.png)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## What we want\n",
    "\n",
    "It would be nice to split the data up into chunks and give one to each core. Then we could get through the processing 8 times as fast. \n",
    "\n",
    "![Running in parallel](multicore.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Multithreading attempt\n",
    "\n",
    "*Threads* are a way for a program to run more than one task at a time. Let's try using this on our application, using the Python `threading` module. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Splitting the data up\n",
    "\n",
    "We're going to need to split the data up into chunks. Numpy has a handy function `numpy.split` which slices the array up into equal portions along a specified axis:\n",
    "\n",
    "    chunks = numpy.split(full_data, num_chunks, axis)\n",
    "\n",
    "*The data must split up equally along this axis! We will solve this problem later*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a new thread for each chunk\n",
    "\n",
    "    def function_to_call(args, arg2, arg3):\n",
    "       ...do something\n",
    "     \n",
    "    ...\n",
    "    \n",
    "    import threading\n",
    "    thread = threading.Thread(target=function_to_call, \n",
    "                              args=[arg1, arg2, arg3])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Waiting for the threads to complete\n",
    "\n",
    "    thread.join()\n",
    "\n",
    "- This waits until `thread` has completed\n",
    "- So, if we have more than one thread we need to keep a list and wait for them all to finish:\n",
    "\n",
    "\n",
    "    for thread in threads:\n",
    "        thread.join()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example code\n",
    "\n",
    "The example code is below - let's see how it does!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting worker for part 0\n",
      "Starting worker for part 1\n",
      " Id: 0: Processing 1024 voxels\n",
      "Id: 1: Processing 1024 voxels\n",
      "Starting worker for part 2Id: 2: Processing 1024 voxels\n",
      "\n",
      "Starting worker for part 3Id: 3: Processing 1024 voxels\n",
      "\n",
      "Id: 1: DoneId: 0: Done\n",
      "\n",
      "Id: 2: Done\n",
      "Id: 3: Done\n",
      "Data processing took 132.90 seconds\n"
     ]
    }
   ],
   "source": [
    "import threading\n",
    "\n",
    "def multithread_process(data):\n",
    "    n_workers = 4\n",
    "    \n",
    "    # Split the data into chunks along axis 0\n",
    "    # We are assuming this axis is divisible by the number of workers!\n",
    "    chunks = numpy.split(data, n_workers, axis=0)\n",
    "    \n",
    "    # Start a worker for each chunk\n",
    "    workers = []\n",
    "    for idx, chunk in enumerate(chunks):\n",
    "        print(\"Starting worker for part %i\" % idx)\n",
    "        w = threading.Thread(target=my_analysis.calculate_data, args=[chunk, idx])\n",
    "        workers.append(w)\n",
    "        w.start()\n",
    "    \n",
    "    # Wait for each worker to finish\n",
    "    for w in workers:\n",
    "        w.join()\n",
    "        \n",
    "def run_with_threads():\n",
    "    data = numpy.random.rand(16, 16, 16)\n",
    "    multithread_process(data)\n",
    "    \n",
    "t = timeit.timeit(run_with_threads, number=1)\n",
    "print(\"Data processing took %.2f seconds\" % t)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# The Big Problem with Python threads"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Only one thread can execute Python code at a time**\n",
    "\n",
    "![Python multithreading](thread_gil.png)\n",
    "\n",
    "This is what's really going on.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The reason is something called the **Global Interpreter Lock (GIL)**. Only one thread can have it, and you can only execute Python code when you have the GIL."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## So, does that mean Python threads are useless?\n",
    "\n",
    "No, not completely. They're useful for:\n",
    "\n",
    "- Making a user interface continue to respond while a calculation takes place in the background\n",
    "- A web server handling multiple requests.\n",
    "  - *The GIL is not required while waiting for network connections*\n",
    "- Doing calculations in parallel which are running in native (C/C++) code\n",
    "  - *The GIL is not required while running native code*\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "### But for doing CPU-intensive Python calculations in parallel, yes Python threads are essentially useless\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Can multiprocessing help?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Differences between threads and processes\n",
    "\n",
    "- Threads are quicker to start up and generally require fewer resources\n",
    "\n",
    "- Threads share memory with the main process \n",
    "  - Don't need to copy your data to pass it to a thread\n",
    "  - Don't need to copy the output data back to the main program\n",
    "  \n",
    "- Processes have their own memory space \n",
    "  - Data needs to be copied from the main program to the process\n",
    "  - Any output needs to be copied back\n",
    "  \n",
    "- However, importantly for Python, *Each process has its own GIL so they can run at the same time as others*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Multiprocessing attempt\n",
    "\n",
    "Multiprocessing is normally more work than multithreading.\n",
    "\n",
    "However Python tries *very hard* to make multiprocessing as easy as multithreading.\n",
    "\n",
    "- `import multiprocessing` instead of `import threading`\n",
    "- `multiprocessing.Process()` instead of `threading.Thread()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting worker for chunk 0\n",
      "Starting worker for chunk 1\n",
      "Starting worker for chunk 2\n",
      "Starting worker for chunk 3\n",
      "Data processing took 9.74 seconds\n"
     ]
    }
   ],
   "source": [
    "import multiprocessing\n",
    "    \n",
    "def multiprocess_process(data):\n",
    "    n_workers = 4\n",
    "    \n",
    "    # Split the data into chunks along axis 0\n",
    "    # We are assuming this axis is divisible by the number of workers!\n",
    "    chunks = numpy.split(data, n_workers, axis=0)\n",
    "    \n",
    "    workers = []\n",
    "    for idx, chunk in enumerate(chunks):\n",
    "        print(\"Starting worker for chunk %i\" % idx)\n",
    "        w = multiprocessing.Process(target=my_analysis.calculate_data, args=[chunk, idx])\n",
    "        workers.append(w)\n",
    "        w.start()\n",
    "    \n",
    "    # Wait for workers to complete\n",
    "    for w in workers:\n",
    "        w.join()\n",
    "        \n",
    "def run_with_processes():\n",
    "    data = numpy.random.rand(16, 16, 16)\n",
    "    multiprocess_process(data)\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    t = timeit.timeit(run_with_processes, number=1)\n",
    "    print(\"Data processing took %.2f seconds\" % t)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Multiprocessing works!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## BUT\n",
    "\n",
    "# Caveats and gotchas\n",
    "\n",
    "Before we just run off and replace all our threads with processes there are a few things we need to bear in mind:\n",
    "\n",
    "## Data copying\n",
    "\n",
    "- Python *copied* each chunk of data to each worker. If the data was very large this could be a significant overhead\n",
    "- Python needs to know *how* to copy all the data we pass to the process. \n",
    "  - This is fine for normal data types (strings, lists, dictionaries, etc) and Numpy arrays\n",
    "  - Can get trouble if you try to pass complex objects to your function \n",
    "  - Anything you pass to the worker needs to be support the `pickle` module\n",
    "\n",
    "## The global variable problem\n",
    "\n",
    "- Can't rely on global variables being copied\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The output problem\n",
    "\n",
    "If you change data in a subprocess, your main program will not see it.\n",
    "\n",
    "## Example:\n",
    "  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# my_analysis.py\n",
    "def add_one(data):\n",
    "    data += 1\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting with zeros\n",
      "[[ 0.  0.]\n",
      " [ 0.  0.]]\n",
      "I think my worker just added one\n",
      "[[ 0.  0.]\n",
      " [ 0.  0.]]\n"
     ]
    }
   ],
   "source": [
    "data = numpy.zeros([2, 2])\n",
    "print(\"Starting with zeros\")\n",
    "print(data)\n",
    "\n",
    "#my_analysis.add_one(data)\n",
    "#w = threading.Thread(target=my_analysis.add_one, args=[data,])\n",
    "w = multiprocessing.Process(target=my_analysis.add_one, args=[data,])\n",
    "w.start()\n",
    "w.join()\n",
    "\n",
    "print(\"I think my worker just added one\")\n",
    "print(data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Making multiprocessing work better"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Problems to solve:\n",
    "\n",
    "- Dividing data amongst processes\n",
    "- Returning data from process\n",
    "- Status updates from process\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Worker pools\n",
    "\n",
    "A *Pool* is a fixed number of worker processes which can be given tasks to do. \n",
    "\n",
    "We can give a pool as many tasks as we like - once a worker finishes a task it will start another, until they're all done.\n",
    "\n",
    "We can create a pool using:\n",
    "\n",
    "    multiprocessing.Pool(num_workers)\n",
    "    \n",
    "- If the number of workers in the pool is equal to the number of cores, we should be able to keep our CPU busy.\n",
    "- Pools are good for load balancing if some slices are more work than others\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Worker pool](pool.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Splitting our data into chunks for the pool\n",
    "\n",
    " - Now we can split our data up into as many chunks as we like\n",
    " - Easiest solution is to use 1-voxel slices along one of the axes `split_axis`:\n",
    "   - `numpy.split(data, data.shape[split_axis], axis=split_axis)`\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Giving the tasks to the pool\n",
    "\n",
    " - Easiest way is to use:\n",
    " \n",
    " \n",
    "    Pool.map(function, task_args)\n",
    "     \n",
    " - task_args is a *sequence of sequences*\n",
    "   - Each element in `task_args` is a sequence of arguments for one task\n",
    "   - The length of `task_args` is the number of tasks\n",
    "   \n",
    "#### Example `task_args` for 5 tasks, each being passed an ID and a chunk of data\n",
    "\n",
    "    [ \n",
    "       [0, chunk_1], # task 1\n",
    "       [1, chunk_2], # task_2\n",
    "       [2, chunk_3], # task 3\n",
    "       [3, chunk_4], # task_4\n",
    "       [4, chunk_5], # task_5\n",
    "    ]\n",
    " \n",
    " - If we have a list of chunks we can generate this with `enumerate(chunks)`\n",
    " \n",
    " \n",
    " - **Arguments are passed to the task in a slightly different way compared to `multiprocessing.Process()`** \n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# my_analysis.py\n",
    "\n",
    "def do_task(args):\n",
    "    # Pool.map passes all our arguments as a single tuple, so unpack it\n",
    "    # and pass the arguments to the real calculate function.\n",
    "    id, data = args\n",
    "    return calculate_data(data, id)\n",
    "   "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " - If you're using Python 3, look into `Pool.starmap`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Getting the output and putting it back together\n",
    "\n",
    " - `Pool.map()` captures the return value of your worker function\n",
    " - It returns a list of all of the return values for each task \n",
    "   - for us this is a list of Numpy arrays, one for each slice\n",
    " - `numpy.concatenate(list_of_slices, split_axis)` will combine them back into a single data item for us\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The full example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Input data shape=(23L, 13L, 11L)\n",
      "Processing 23 parts with 4 workers\n",
      "Processed data, output shape=(23L, 13L, 11L)\n",
      "Data processing took 8.08 seconds\n"
     ]
    }
   ],
   "source": [
    "# Split our data along the x-axis\n",
    "SPLIT_AXIS = 0\n",
    "\n",
    "def pool_process(data):\n",
    "    n_workers = 4\n",
    "    \n",
    "    # Split the data into 1-voxel slices along axis 0\n",
    "    parts = numpy.split(data, data.shape[SPLIT_AXIS], axis=SPLIT_AXIS)\n",
    "\n",
    "    print(\"Input data shape=%s\" % str(data.shape))\n",
    "    print(\"Processing %i parts with %i workers\" % (len(parts), n_workers))\n",
    "    \n",
    "    # Create a pool - normally this would be 1 worker per CPU core\n",
    "    pool = multiprocessing.Pool(n_workers)\n",
    "    \n",
    "    # Send the tasks to the workers\n",
    "    list_of_slices = pool.map(my_analysis.do_task, enumerate(parts))\n",
    "    \n",
    "    # Combine the return data back into a single array\n",
    "    processed_array = numpy.concatenate(list_of_slices, SPLIT_AXIS)\n",
    "    \n",
    "    print(\"Processed data, output shape=%s\" % str(processed_array.shape))\n",
    "    \n",
    "def run_with_pool():\n",
    "    data = numpy.random.rand(23, 13, 11)\n",
    "    pool_process(data)\n",
    " \n",
    "t = timeit.timeit(run_with_pool, number=1)\n",
    "print(\"Data processing took %.2f seconds\" % t)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Communication / Status updates?\n",
    "\n",
    " - Would be nice if workers could communicate their progress as they work. One way to do this is using a `Queue`.\n",
    " "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "## Queues\n",
    "\n",
    "A Queue is often used to send status updates from the process to the main program.\n",
    "\n",
    " - Shared between the main program and the subprocesses\n",
    " - Create it with `multiprocessing.Manager().Queue()`\n",
    " - Pass it to the worker thread like any other argument\n",
    " - Worker calls `queue.put()` to send some data to the queue\n",
    " - Main program calls `queue.get()` to get data off the queue\n",
    " - Queue is FIFO (First In First Out)\n",
    " - Need to have a thread running which checks the queue for updates every so often\n",
    "   - This is a good use for threads!\n",
    " \n",
    "![Queue](queue_put.png)\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Modify our example to report progress"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# my_analysis.py \n",
    "\n",
    "def calculate_data_and_report(args):\n",
    "    id, data, queue = args\n",
    "    \n",
    "    # Run 'calculate_voxel' on each voxel in data\n",
    "    vectorized = numpy.vectorize(calculate_voxel)\n",
    "    vectorized(data)\n",
    "    \n",
    "    # Report our ID and how many voxels we have done to the queue\n",
    "    queue.put((id, data.size))\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a thread to monitor the queue for updates\n",
    "\n",
    "I've done this as a class, because it that's the easiest way"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "class QueueChecker():\n",
    "    def __init__(self, queue, num_voxels, interval_seconds=1):\n",
    "        self._queue = queue\n",
    "        self._num_voxels = num_voxels\n",
    "        self._interval_seconds = interval_seconds\n",
    "        self._voxels_done = 0\n",
    "        self._cancel = False\n",
    "        self._restart_timer()\n",
    "        \n",
    "    def cancel(self):\n",
    "        self._cancel = True\n",
    "        \n",
    "    def _restart_timer(self):\n",
    "        self._timer = threading.Timer(self._interval_seconds, self._check_queue)\n",
    "        self._timer.start()\n",
    "\n",
    "    def _check_queue(self):\n",
    "        while not self._queue.empty():\n",
    "            id, voxels_done = self._queue.get()\n",
    "            self._voxels_done += voxels_done\n",
    "            \n",
    "        percent = int(100*float(self._voxels_done)/self._num_voxels)\n",
    "        print(\"%i%% complete\" % percent)\n",
    "        if not self._cancel:\n",
    "            self._restart_timer()\n",
    "     "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Modify our main program to pass the queue to each of our workers\n",
    "\n",
    "We need to create the queue and the `QueueChecker` and make sure each task includes a copy of the queue\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Input data shape=(23L, 19L, 17L)\n",
      "We are processing 23 parts with 4 workers\n",
      "0% complete\n",
      "0% complete\n",
      "13% complete\n",
      "17% complete\n",
      "17% complete\n",
      "34% complete\n",
      "34% complete\n",
      "47% complete\n",
      "52% complete\n",
      "56% complete\n",
      "69% complete\n",
      "69% complete\n",
      "78% complete\n",
      "86% complete\n",
      "95% complete\n",
      "Processed data, output shape=(23L, 19L, 17L)\n",
      "Data processing took 17.39 seconds\n",
      "100% complete\n"
     ]
    }
   ],
   "source": [
    "# Split our data along the x-axis\n",
    "SPLIT_AXIS = 0\n",
    "reload(my_analysis)\n",
    "\n",
    "def pool_process(data):\n",
    "    n_workers = 4\n",
    "    \n",
    "    # Split the data into 1-voxel slices along axis 0\n",
    "    parts = numpy.split(data, data.shape[SPLIT_AXIS], axis=SPLIT_AXIS)\n",
    "\n",
    "    print(\"Input data shape=%s\" % str(data.shape))\n",
    "    print(\"We are processing %i parts with %i workers\" % (len(parts), n_workers))\n",
    "    pool = multiprocessing.Pool(n_workers)\n",
    "    \n",
    "    # Create the queue\n",
    "    queue = multiprocessing.Manager().Queue()\n",
    "    checker = QueueChecker(queue, data.size)\n",
    "    \n",
    "    # Note that we need to pass the queue as an argument to the worker\n",
    "    args = [(id, part, queue) for id, part in enumerate(parts)]\n",
    "    list_of_slices = pool.map(my_analysis.calculate_data_and_report, args)\n",
    "    \n",
    "    checker.cancel()\n",
    "    \n",
    "    # Join processed data back together again\n",
    "    processed_array = numpy.concatenate(list_of_slices, SPLIT_AXIS)\n",
    "    print(\"Processed data, output shape=%s\" % str(processed_array.shape))\n",
    "        \n",
    "def run_with_pool():\n",
    "    data = numpy.random.rand(23, 19, 17)\n",
    "    pool_process(data)\n",
    "\n",
    "t = timeit.timeit(run_with_pool, number=1)\n",
    "print(\"Data processing took %.2f seconds\" % t)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Summary"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## What we've covered\n",
    "\n",
    " - Limitations of threading for parallel processing in Python\n",
    " - How to split up a simple voxel-processing task into separate chunks\n",
    "   - `numpy.split()`\n",
    " - How to run each chunk in parallel using multiprocessing\n",
    "   - `multiprocessing.Process`\n",
    " - How to separate the number of tasks from the number of workers \n",
    "   - `multiprocessing.Pool()`\n",
    "   - `Pool.map()`\n",
    " - How to get output back from the workers and join it back together again\n",
    "   - `numpy.concatenate()`\n",
    " - How to pass back progress information from our worker processes\n",
    "   - `multiprocessing.manager.Queue()`\n",
    " - Using a threading.Timer object to monitor the queue and display updates"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Things I haven't covered\n",
    "\n",
    "Loads of stuff!\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Threading\n",
    "\n",
    "- Locking of shared data (so only one thread can use it at a time)\n",
    "- Thread-local storage (see `threading.local()`)\n",
    "- See Paul's tutorial on the PyTreat GIT for more information\n",
    "- Or see the `threading` Python documentation for full details"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Multiprocessing\n",
    "\n",
    "- Passing data *between* workers\n",
    "  - Can use `Queue` for one-way traffic\n",
    "  - Use `Pipe` for two-way communication between one worker and another\n",
    "  - May be required when your problem is not 'embarrasingly parallel'\n",
    "- Sharing memory\n",
    "  - Way to avoid copying large amounts of data\n",
    "  - Look at `multiprocessing.Array`\n",
    "  - Need to convert Numpy array into a ctypes array\n",
    "  - Shared memory has pitfalls\n",
    "  - *Don't go here unless you have aready determined that data copying is a bottleneck*\n",
    "- Running workers asynchronously\n",
    "  - So main program doesn't have to wait for them to finish\n",
    "  - Instead, a function is called every time a task is finished\n",
    "  - see `multiprocessing.apply_async()` for more information\n",
    "- Error handling\n",
    "  - Needs a bit of care - very easy to 'lose' errors\n",
    "  - Workers should catch all exceptions\n",
    "  - And should return a value to signal when a task has failed\n",
    "  - Main program decides how to deal with it\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## Always remember\n",
    "\n",
    "**Python is not the best tool for every job!**\n",
    "\n",
    "If you are really after performance, consider implementing your algorithm in multi-threaded C/C++ and then create a Python interface.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.14"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}