From ca6e659eb643c3c0ee88aa85a4c1565667de0bf8 Mon Sep 17 00:00:00 2001 From: Paul McCarthy <pauldmccarthy@gmail.com> Date: Mon, 26 Feb 2018 09:29:33 +0000 Subject: [PATCH] added section on shared memory in threading/multiprocessing practical --- advanced_topics/07_threading.ipynb | 206 +++++++++++++++++++++++++++++ advanced_topics/07_threading.md | 182 +++++++++++++++++++++++++ 2 files changed, 388 insertions(+) diff --git a/advanced_topics/07_threading.ipynb b/advanced_topics/07_threading.ipynb index f783b7e..83167ef 100644 --- a/advanced_topics/07_threading.ipynb +++ b/advanced_topics/07_threading.ipynb @@ -606,6 +606,212 @@ "for t1, result in zip(t1s, nlinresults):\n", " print(t1, ':', result)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Sharing data between processes\n", + "\n", + "\n", + "When you use the `Pool.map` method (or any of the other methods we have shown)\n", + "to run a function on a sequence of items, those items must be copied into the\n", + "memory of each of the child processes. When the child processes are finished,\n", + "the data that they return then has to be copied back to the parent process.\n", + "\n", + "\n", + "Any items which you wish to pass to a function that is executed by a `Pool`\n", + "must be - the built-in\n", + "[`pickle`](https://docs.python.org/3.5/library/pickle.html) module is used by\n", + "`multiprocessing` to serialise and de-serialise the data passed into and\n", + "returned from a child process. The majority of standard Python types (`list`,\n", + "`dict`, `str` etc), and Numpy arrays can be pickled and unpickled, so you only\n", + "need to worry about this detail if you are passing objects of a custom type\n", + "(e.g. instances of classes that you have written, or that are defined in some\n", + "third-party library).\n", + "\n", + "\n", + "There is obviously some overhead in copying data back and forth between the\n", + "main process and the worker processes. For most computationally intensive\n", + "tasks, this communication overhead is not important - the performance\n", + "bottleneck is typically going to be the computation time, rather than I/O\n", + "between the parent and child processes. You may need to spend some time\n", + "adjusting the way in which you split up your data, and the number of\n", + "processes, in order to get the best performance.\n", + "\n", + "\n", + "However, if you have determined that copying data between processes is having\n", + "a substantial impact on your performance, the `multiprocessing` module\n", + "provides the [`Value`, `Array`, and `RawArray`\n", + "classes](https://docs.python.org/3.5/library/multiprocessing.html#shared-ctypes-objects),\n", + "which allow you to share individual values, or arrays of values, respectively.\n", + "\n", + "\n", + "The `Array` and `RawArray` classes essentially wrap a typed pointer (from the\n", + "built-in [`ctypes`](https://docs.python.org/3.5/library/ctypes.html) module)\n", + "to a block of memory. We can use the `Array` or `RawArray` class to share a\n", + "Numpy array between our worker processes. The difference between an `Array`\n", + "and a `RawArray` is that the former offers synchronised (i.e. process-safe)\n", + "access to the shared memory. This is necessary if your child processes will be\n", + "modifying the same parts of your data.\n", + "\n", + "\n", + "Due to the way that shared memory works, in order to share a Numpy array\n", + "between different processes you need to structure your code so that the\n", + "array(s) you want to share are accessible at the _module level_. Furthermore,\n", + "we need to make sure that our input and output arrays are located in shared\n", + "memory - we can do this via the `Array` or `RawArray`.\n", + "\n", + "\n", + "As an example, let's say we want to parallelise processing of an image by\n", + "having each worker process perform calculations on a chunk of the image.\n", + "First, let's define a function which does the calculation on a specified set\n", + "of image coordinates:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import multiprocessing as mp\n", + "import ctypes\n", + "import numpy as np\n", + "np.set_printoptions(suppress=True)\n", + "\n", + "\n", + "def process_chunk(shape, idxs):\n", + "\n", + " # Get references to our\n", + " # input/output data, and\n", + " # create Numpy array views\n", + " # into them.\n", + " sindata = process_chunk.input_data\n", + " soutdata = process_chunk.output_data\n", + " indata = np.ctypeslib.as_array(sindata) .reshape(shape)\n", + " outdata = np.ctypeslib.as_array(soutdata).reshape(shape)\n", + "\n", + " # Do the calculation on\n", + " # the specified voxels\n", + " outdata[idxs] = indata[idxs] ** 2" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Rather than passing the input and output data arrays in as arguments to the\n", + "`process_chunk` function, we set them as attributes of the `process_chunk`\n", + "function. This makes the input/output data accessible at the module level,\n", + "which is required in order to share the data between the main process and the\n", + "child processes.\n", + "\n", + "\n", + "Now let's define a second function which process an entire image. It does the\n", + "following:\n", + "\n", + "\n", + "1. Initialises shared memory areas to store the input and output data.\n", + "2. Copies the input data into shared memory.\n", + "3. Sets the input and output data as attributes of the `process_chunk` function.\n", + "4. Creates sets of indices into the input data which, for each worker process,\n", + " specifies the portion of the data that it is responsible for.\n", + "5. Creates a worker pool, and runs the `process_chunk` function for each set\n", + " of indices." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def process_dataset(data):\n", + "\n", + " nprocs = 8\n", + " origData = data\n", + "\n", + " # Create arrays to store the\n", + " # input and output data\n", + " sindata = mp.RawArray(ctypes.c_double, data.size)\n", + " soutdata = mp.RawArray(ctypes.c_double, data.size)\n", + " data = np.ctypeslib.as_array(sindata).reshape(data.shape)\n", + " outdata = np.ctypeslib.as_array(soutdata).reshape(data.shape)\n", + "\n", + " # Copy the input data\n", + " # into shared memory\n", + " data[:] = origData\n", + "\n", + " # Make the input/output data\n", + " # accessible to the process_chunk\n", + " # function. This must be done\n", + " # *before* the worker pool is created.\n", + " process_chunk.input_data = sindata\n", + " process_chunk.output_data = soutdata\n", + "\n", + " # number of boxels to be computed\n", + " # by each worker process.\n", + " nvox = int(data.size / nprocs)\n", + "\n", + " # Generate coordinates for\n", + " # every voxel in the image\n", + " xlen, ylen, zlen = data.shape\n", + " xs, ys, zs = np.meshgrid(np.arange(xlen),\n", + " np.arange(ylen),\n", + " np.arange(zlen))\n", + "\n", + " xs = xs.flatten()\n", + " ys = ys.flatten()\n", + " zs = zs.flatten()\n", + "\n", + " # We're going to pass each worker\n", + " # process a list of indices, which\n", + " # specify the data items which that\n", + " # worker process needs to compute.\n", + " xs = [xs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \\\n", + " [xs[nvox * nprocs:]]\n", + " ys = [ys[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \\\n", + " [ys[nvox * nprocs:]]\n", + " zs = [zs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \\\n", + " [zs[nvox * nprocs:]]\n", + "\n", + " # Build the argument lists for\n", + " # each worker process.\n", + " args = [(data.shape, (x, y, z)) for x, y, z in zip(xs, ys, zs)]\n", + "\n", + " # Create a pool of worker\n", + " # processes and run the jobs.\n", + " pool = mp.Pool(processes=nprocs)\n", + "\n", + " pool.starmap(process_chunk, args)\n", + "\n", + " return outdata" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can call our `process_data` function just like any other function:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data = np.array(np.arange(64).reshape((4, 4, 4)), dtype=np.float64)\n", + "\n", + "outdata = process_dataset(data)\n", + "\n", + "print('Input')\n", + "print(data)\n", + "\n", + "print('Output')\n", + "print(outdata)" + ] } ], "metadata": {}, diff --git a/advanced_topics/07_threading.md b/advanced_topics/07_threading.md index 4c3d825..460396d 100644 --- a/advanced_topics/07_threading.md +++ b/advanced_topics/07_threading.md @@ -514,3 +514,185 @@ print('Non linear registrations:') for t1, result in zip(t1s, nlinresults): print(t1, ':', result) ``` + + +### Sharing data between processes + + +When you use the `Pool.map` method (or any of the other methods we have shown) +to run a function on a sequence of items, those items must be copied into the +memory of each of the child processes. When the child processes are finished, +the data that they return then has to be copied back to the parent process. + + +Any items which you wish to pass to a function that is executed by a `Pool` +must be - the built-in +[`pickle`](https://docs.python.org/3.5/library/pickle.html) module is used by +`multiprocessing` to serialise and de-serialise the data passed into and +returned from a child process. The majority of standard Python types (`list`, +`dict`, `str` etc), and Numpy arrays can be pickled and unpickled, so you only +need to worry about this detail if you are passing objects of a custom type +(e.g. instances of classes that you have written, or that are defined in some +third-party library). + + +There is obviously some overhead in copying data back and forth between the +main process and the worker processes. For most computationally intensive +tasks, this communication overhead is not important - the performance +bottleneck is typically going to be the computation time, rather than I/O +between the parent and child processes. You may need to spend some time +adjusting the way in which you split up your data, and the number of +processes, in order to get the best performance. + + +However, if you have determined that copying data between processes is having +a substantial impact on your performance, the `multiprocessing` module +provides the [`Value`, `Array`, and `RawArray` +classes](https://docs.python.org/3.5/library/multiprocessing.html#shared-ctypes-objects), +which allow you to share individual values, or arrays of values, respectively. + + +The `Array` and `RawArray` classes essentially wrap a typed pointer (from the +built-in [`ctypes`](https://docs.python.org/3.5/library/ctypes.html) module) +to a block of memory. We can use the `Array` or `RawArray` class to share a +Numpy array between our worker processes. The difference between an `Array` +and a `RawArray` is that the former offers synchronised (i.e. process-safe) +access to the shared memory. This is necessary if your child processes will be +modifying the same parts of your data. + + +Due to the way that shared memory works, in order to share a Numpy array +between different processes you need to structure your code so that the +array(s) you want to share are accessible at the _module level_. Furthermore, +we need to make sure that our input and output arrays are located in shared +memory - we can do this via the `Array` or `RawArray`. + + +As an example, let's say we want to parallelise processing of an image by +having each worker process perform calculations on a chunk of the image. +First, let's define a function which does the calculation on a specified set +of image coordinates: + + +``` +import multiprocessing as mp +import ctypes +import numpy as np +np.set_printoptions(suppress=True) + + +def process_chunk(shape, idxs): + + # Get references to our + # input/output data, and + # create Numpy array views + # into them. + sindata = process_chunk.input_data + soutdata = process_chunk.output_data + indata = np.ctypeslib.as_array(sindata) .reshape(shape) + outdata = np.ctypeslib.as_array(soutdata).reshape(shape) + + # Do the calculation on + # the specified voxels + outdata[idxs] = indata[idxs] ** 2 +``` + + +Rather than passing the input and output data arrays in as arguments to the +`process_chunk` function, we set them as attributes of the `process_chunk` +function. This makes the input/output data accessible at the module level, +which is required in order to share the data between the main process and the +child processes. + + +Now let's define a second function which process an entire image. It does the +following: + + +1. Initialises shared memory areas to store the input and output data. +2. Copies the input data into shared memory. +3. Sets the input and output data as attributes of the `process_chunk` function. +4. Creates sets of indices into the input data which, for each worker process, + specifies the portion of the data that it is responsible for. +5. Creates a worker pool, and runs the `process_chunk` function for each set + of indices. + + +``` +def process_dataset(data): + + nprocs = 8 + origData = data + + # Create arrays to store the + # input and output data + sindata = mp.RawArray(ctypes.c_double, data.size) + soutdata = mp.RawArray(ctypes.c_double, data.size) + data = np.ctypeslib.as_array(sindata).reshape(data.shape) + outdata = np.ctypeslib.as_array(soutdata).reshape(data.shape) + + # Copy the input data + # into shared memory + data[:] = origData + + # Make the input/output data + # accessible to the process_chunk + # function. This must be done + # *before* the worker pool is created. + process_chunk.input_data = sindata + process_chunk.output_data = soutdata + + # number of boxels to be computed + # by each worker process. + nvox = int(data.size / nprocs) + + # Generate coordinates for + # every voxel in the image + xlen, ylen, zlen = data.shape + xs, ys, zs = np.meshgrid(np.arange(xlen), + np.arange(ylen), + np.arange(zlen)) + + xs = xs.flatten() + ys = ys.flatten() + zs = zs.flatten() + + # We're going to pass each worker + # process a list of indices, which + # specify the data items which that + # worker process needs to compute. + xs = [xs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \ + [xs[nvox * nprocs:]] + ys = [ys[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \ + [ys[nvox * nprocs:]] + zs = [zs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \ + [zs[nvox * nprocs:]] + + # Build the argument lists for + # each worker process. + args = [(data.shape, (x, y, z)) for x, y, z in zip(xs, ys, zs)] + + # Create a pool of worker + # processes and run the jobs. + pool = mp.Pool(processes=nprocs) + + pool.starmap(process_chunk, args) + + return outdata +``` + + +Now we can call our `process_data` function just like any other function: + + +``` +data = np.array(np.arange(64).reshape((4, 4, 4)), dtype=np.float64) + +outdata = process_dataset(data) + +print('Input') +print(data) + +print('Output') +print(outdata) +``` -- GitLab