From 8a659babaf2a008760936d08547bbedc75764c50 Mon Sep 17 00:00:00 2001 From: manifest-rules <manifest-rules@git.fmrib.ox.ac.uk> Date: Mon, 4 Mar 2024 12:50:39 +0000 Subject: [PATCH] Add mmap example --- advanced_programming/threading.ipynb | 203 +++++++++++++++++++++------ advanced_programming/threading.md | 106 ++++++++++++-- 2 files changed, 259 insertions(+), 50 deletions(-) diff --git a/advanced_programming/threading.ipynb b/advanced_programming/threading.ipynb index cb2f07b..caf5985 100644 --- a/advanced_programming/threading.ipynb +++ b/advanced_programming/threading.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "markdown", - "id": "215bdbaf", + "id": "12ef343d", "metadata": {}, "source": [ "# Threading and parallel processing\n", @@ -13,7 +13,7 @@ "true parallelism in the\n", "[`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)\n", "module. If you want to be impressed, skip straight to the section on\n", - "[`multiprocessing`](todo).\n", + "[`multiprocessing`](multiprocessing).\n", "\n", "\n", "> *Note*: This notebook covers features that are built-in to the Python\n", @@ -46,6 +46,7 @@ " * [`Pool.map`](#pool-map)\n", " * [`Pool.apply_async`](#pool-apply-async)\n", "* [Sharing data between processes](#sharing-data-between-processes)\n", + " * [Memory-mapping](#memory-mapping)\n", " * [Read-only sharing](#read-only-sharing)\n", " * [Read/write sharing](#read-write-sharing)\n", "\n", @@ -67,7 +68,7 @@ { "cell_type": "code", "execution_count": null, - "id": "53f13f61", + "id": "956c477f", "metadata": {}, "outputs": [], "source": [ @@ -92,7 +93,7 @@ }, { "cell_type": "markdown", - "id": "859f5455", + "id": "c7f0f9ad", "metadata": {}, "source": [ "You can also `join` a thread, which will block execution in the current thread\n", @@ -102,7 +103,7 @@ { "cell_type": "code", "execution_count": null, - "id": "b039f5db", + "id": "f6e3d5e6", "metadata": {}, "outputs": [], "source": [ @@ -116,7 +117,7 @@ }, { "cell_type": "markdown", - "id": "2da49354", + "id": "41def024", "metadata": {}, "source": [ "<a class=\"anchor\" id=\"subclassing-thread\"></a>\n", @@ -130,7 +131,7 @@ { "cell_type": "code", "execution_count": null, - "id": "7d248656", + "id": "dbf8e4ff", "metadata": {}, "outputs": [], "source": [ @@ -153,7 +154,7 @@ }, { "cell_type": "markdown", - "id": "1d90d56c", + "id": "a3218617", "metadata": {}, "source": [ "<a class=\"anchor\" id=\"daemon-threads\"></a>\n", @@ -174,7 +175,7 @@ { "cell_type": "code", "execution_count": null, - "id": "ceafbaac", + "id": "5a0b442b", "metadata": {}, "outputs": [], "source": [ @@ -184,7 +185,7 @@ }, { "cell_type": "markdown", - "id": "df69b8e4", + "id": "f04828ff", "metadata": {}, "source": [ "See the [`Thread`\n", @@ -219,7 +220,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25334a03", + "id": "33d52d8b", "metadata": {}, "outputs": [], "source": [ @@ -236,7 +237,7 @@ }, { "cell_type": "markdown", - "id": "6039c7a6", + "id": "281eb07a", "metadata": {}, "source": [ "But if we protect the critical section with a `Lock` object, the output will\n", @@ -246,7 +247,7 @@ { "cell_type": "code", "execution_count": null, - "id": "f8b9b6ad", + "id": "40802e7f", "metadata": {}, "outputs": [], "source": [ @@ -267,7 +268,7 @@ }, { "cell_type": "markdown", - "id": "52761f94", + "id": "88acba0e", "metadata": {}, "source": [ "> Instead of using a `Lock` object in a `with` statement, it is also possible\n", @@ -291,7 +292,7 @@ { "cell_type": "code", "execution_count": null, - "id": "ede5bfb6", + "id": "4c30c31a", "metadata": {}, "outputs": [], "source": [ @@ -331,7 +332,7 @@ }, { "cell_type": "markdown", - "id": "dc3de638", + "id": "c69dbe16", "metadata": {}, "source": [ "Try removing the `mutex` lock from the two methods in the above code, and see\n", @@ -355,7 +356,7 @@ { "cell_type": "code", "execution_count": null, - "id": "4441bd44", + "id": "b0b933b6", "metadata": {}, "outputs": [], "source": [ @@ -380,7 +381,7 @@ }, { "cell_type": "markdown", - "id": "865b59c8", + "id": "2a6a36e2", "metadata": {}, "source": [ "<a class=\"anchor\" id=\"the-global-interpreter-lock-gil\"></a>\n", @@ -501,7 +502,7 @@ { "cell_type": "code", "execution_count": null, - "id": "86a769fe", + "id": "daadc9c9", "metadata": {}, "outputs": [], "source": [ @@ -540,7 +541,7 @@ }, { "cell_type": "markdown", - "id": "68cfea5c", + "id": "51d5ae8a", "metadata": {}, "source": [ "The `Pool.map` method only works with functions that accept one argument, such\n", @@ -553,7 +554,7 @@ { "cell_type": "code", "execution_count": null, - "id": "9f249cde", + "id": "60ce3e5b", "metadata": {}, "outputs": [], "source": [ @@ -593,7 +594,7 @@ }, { "cell_type": "markdown", - "id": "b7cf7cb6", + "id": "99d35451", "metadata": {}, "source": [ "The `map` and `starmap` methods also have asynchronous equivalents `map_async`\n", @@ -624,7 +625,7 @@ { "cell_type": "code", "execution_count": null, - "id": "4dcee67f", + "id": "b791805e", "metadata": {}, "outputs": [], "source": [ @@ -702,7 +703,7 @@ }, { "cell_type": "markdown", - "id": "0ab5d9e2", + "id": "495c9c03", "metadata": {}, "source": [ "<a class=\"anchor\" id=\"sharing-data-between-processes\"></a>\n", @@ -740,10 +741,134 @@ "computation time, rather than I/O between the parent and child processes.\n", "\n", "\n", - "However, if you are working with a large dataset, you have determined that\n", - "copying data between processes is having a substantial impact on your\n", - "performance, and instead wish to *share* a single copy of the data between\n", - "the processes, you will need to:\n", + "However, if you are working with a large data set, where copying it between\n", + "processes is not viable, you have a couple of options available to you.\n", + "\n", + "\n", + "<a class=\"anchor\" id=\"memory-mapping\"></a>\n", + "### Memory-mapping\n", + "\n", + "\n", + "One method for sharing a large `numpy` array between multiple processes is to\n", + "use a _memory-mapped_ array. This is a feature built into `numpy` which\n", + "stores your data in a regular file, instead of in memory. This allows your\n", + "data to be simultaneously read and written by multiple processes, and is fairly\n", + "straightforward to accomplish.\n", + "\n", + "For example, let's say you have some 4D fMRI data, and wish to fit a\n", + "complicated model to the time series at each voxel. First we will load our 4D\n", + "data, and pre-allocate another array to store the fitted model parameters:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3384d747", + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "import functools as ft\n", + "import multiprocessing as mp\n", + "import numpy as np\n", + "\n", + "# Store the parameters that are required\n", + "# to create our memory-mapped arrays, as\n", + "# we need to re-use them a couple of times.\n", + "#\n", + "# Note that in practice you would usually\n", + "# want to store these files in a temporary\n", + "# directory, and/or ensure that they are\n", + "# deleted once you are finished.\n", + "data_params = dict(filename='data.mmap', shape=(91, 109, 91, 50), dtype=np.float32)\n", + "model_params = dict(filename='model.mmap', shape=(91, 109, 91), dtype=np.float32)\n", + "\n", + "# Load our data as a memory-mapped array (we\n", + "# are using random data for this example)\n", + "data = np.memmap(**data_params, mode='w+')\n", + "data[:] = np.random.random((91, 109, 91, 50)).astype(np.float32)\n", + "data.flush()\n", + "\n", + "# Pre-allocate space to store the fitted\n", + "# model parameters\n", + "model = np.memmap(**model_params, mode='w+')" + ] + }, + { + "cell_type": "markdown", + "id": "15ae0eb1", + "metadata": {}, + "source": [ + "> If your image files are uncompressed (i.e. `.nii` rather than `.nii.gz`),\n", + "> you can instruct `nibabel` and `fslpy` to load them as a memory-map by\n", + "> passing `mmap=True` to the `nibabel.load` function, and the\n", + "> `fsl.data.image.Image` constructor.\n", + "\n", + "\n", + "Now we will write our model fitting function so that it works on one slice at\n", + "a time - this will allow us to process multiple slices in parallel. Note\n", + "that, within this function, we have to _re-load_ the memory-mapped arrays. In\n", + "this example we have written this function so as to expect the arguments\n", + "required to create the two memory-maps to be passed in (the `data_params` and\n", + "`model_params` dictionaries that we created above):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2daf1f1b", + "metadata": {}, + "outputs": [], + "source": [ + "def fit_model(indata, outdata, sliceidx):\n", + "\n", + " indata = np.memmap(**indata, mode='r')\n", + " outdata = np.memmap(**outdata, mode='r+')\n", + "\n", + " # sleep to simulate expensive model fitting\n", + " print(f'Fitting model at slice {sliceidx}')\n", + " time.sleep(1)\n", + " outdata[:, :, sliceidx] = indata[:, :, sliceidx, :].mean() + sliceidx" + ] + }, + { + "cell_type": "markdown", + "id": "90b8f2e3", + "metadata": {}, + "source": [ + "Now we can use `multiprocessing` to fit the model in parallel across all of the\n", + "image slices:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ffb4c693", + "metadata": {}, + "outputs": [], + "source": [ + "fit_function = ft.partial(fit_model, data_params, model_params)\n", + "slice_idxs = list(range(91))\n", + "\n", + "with mp.Pool(processes=16) as pool:\n", + " pool.map(fit_function, slice_idxs)\n", + "\n", + "print(model)" + ] + }, + { + "cell_type": "markdown", + "id": "dd0dd890", + "metadata": {}, + "source": [ + "<a class=\"anchor\" id=\"read-only-sharing\"></a>\n", + "### Read-only sharing\n", + "\n", + "\n", + "If you are working with a large dataset, you have determined that copying data\n", + "between processes is having a substantial impact on your performance, and have\n", + "also decided that memory-mapping is not an option for you, and instead wish to\n", + "*share* a single copy of the data between the processes, you will need to:\n", "\n", " 1. Structure your code so that the data you want to share is accessible at\n", " the *module level*.\n", @@ -762,10 +887,6 @@ "[wiki-fork]: https://en.wikipedia.org/wiki/Fork_(system_call)\n", "\n", "\n", - "<a class=\"anchor\" id=\"read-only-sharing\"></a>\n", - "### Read-only sharing\n", - "\n", - "\n", "Let's see this in action with a simple example. We'll start by defining a\n", "horrible little helper function which allows us to track the total memory\n", "usage:" @@ -774,7 +895,7 @@ { "cell_type": "code", "execution_count": null, - "id": "af8db9e4", + "id": "13fe8356", "metadata": {}, "outputs": [], "source": [ @@ -800,7 +921,7 @@ }, { "cell_type": "markdown", - "id": "7ec2c3c7", + "id": "398f7b19", "metadata": {}, "source": [ "Now our task is simply to calculate the sum of a large array of numbers. We're\n", @@ -811,7 +932,7 @@ { "cell_type": "code", "execution_count": null, - "id": "0e872d7f", + "id": "66b9917b", "metadata": {}, "outputs": [], "source": [ @@ -861,7 +982,7 @@ }, { "cell_type": "markdown", - "id": "026e1cd3", + "id": "9b06285f", "metadata": {}, "source": [ "You should be able to see that only one copy of `data` is created, and is\n", @@ -949,7 +1070,7 @@ { "cell_type": "code", "execution_count": null, - "id": "a04bc5de", + "id": "d7a8f363", "metadata": {}, "outputs": [], "source": [ @@ -977,7 +1098,7 @@ }, { "cell_type": "markdown", - "id": "518073ed", + "id": "4f0cbe28", "metadata": {}, "source": [ "Rather than passing the input and output data arrays in as arguments to the\n", @@ -1003,7 +1124,7 @@ { "cell_type": "code", "execution_count": null, - "id": "349cb770", + "id": "c3ff121f", "metadata": {}, "outputs": [], "source": [ @@ -1074,7 +1195,7 @@ }, { "cell_type": "markdown", - "id": "0abbf164", + "id": "09269b65", "metadata": {}, "source": [ "Now we can call our `process_data` function just like any other function:" @@ -1083,7 +1204,7 @@ { "cell_type": "code", "execution_count": null, - "id": "ccc4ea77", + "id": "27a07401", "metadata": {}, "outputs": [], "source": [ diff --git a/advanced_programming/threading.md b/advanced_programming/threading.md index 63cccd3..78cce47 100644 --- a/advanced_programming/threading.md +++ b/advanced_programming/threading.md @@ -6,7 +6,7 @@ The Python language has built-in support for multi-threading in the true parallelism in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) module. If you want to be impressed, skip straight to the section on -[`multiprocessing`](todo). +[`multiprocessing`](multiprocessing). > *Note*: This notebook covers features that are built-in to the Python @@ -39,6 +39,7 @@ module. If you want to be impressed, skip straight to the section on * [`Pool.map`](#pool-map) * [`Pool.apply_async`](#pool-apply-async) * [Sharing data between processes](#sharing-data-between-processes) + * [Memory-mapping](#memory-mapping) * [Read-only sharing](#read-only-sharing) * [Read/write sharing](#read-write-sharing) @@ -623,10 +624,101 @@ important - the performance bottleneck is typically going to be the computation time, rather than I/O between the parent and child processes. -However, if you are working with a large dataset, you have determined that -copying data between processes is having a substantial impact on your -performance, and instead wish to *share* a single copy of the data between -the processes, you will need to: +However, if you are working with a large data set, where copying it between +processes is not viable, you have a couple of options available to you. + + +<a class="anchor" id="memory-mapping"></a> +### Memory-mapping + + +One method for sharing a large `numpy` array between multiple processes is to +use a _memory-mapped_ array. This is a feature built into `numpy` which +stores your data in a regular file, instead of in memory. This allows your +data to be simultaneously read and written by multiple processes, and is fairly +straightforward to accomplish. + +For example, let's say you have some 4D fMRI data, and wish to fit a +complicated model to the time series at each voxel. First we will load our 4D +data, and pre-allocate another array to store the fitted model parameters: + + +``` +import time +import functools as ft +import multiprocessing as mp +import numpy as np + +# Store the parameters that are required +# to create our memory-mapped arrays, as +# we need to re-use them a couple of times. +# +# Note that in practice you would usually +# want to store these files in a temporary +# directory, and/or ensure that they are +# deleted once you are finished. +data_params = dict(filename='data.mmap', shape=(91, 109, 91, 50), dtype=np.float32) +model_params = dict(filename='model.mmap', shape=(91, 109, 91), dtype=np.float32) + +# Load our data as a memory-mapped array (we +# are using random data for this example) +data = np.memmap(**data_params, mode='w+') +data[:] = np.random.random((91, 109, 91, 50)).astype(np.float32) +data.flush() + +# Pre-allocate space to store the fitted +# model parameters +model = np.memmap(**model_params, mode='w+') +``` + + +> If your image files are uncompressed (i.e. `.nii` rather than `.nii.gz`), +> you can instruct `nibabel` and `fslpy` to load them as a memory-map by +> passing `mmap=True` to the `nibabel.load` function, and the +> `fsl.data.image.Image` constructor. + + +Now we will write our model fitting function so that it works on one slice at +a time - this will allow us to process multiple slices in parallel. Note +that, within this function, we have to _re-load_ the memory-mapped arrays. In +this example we have written this function so as to expect the arguments +required to create the two memory-maps to be passed in (the `data_params` and +`model_params` dictionaries that we created above): + +``` +def fit_model(indata, outdata, sliceidx): + + indata = np.memmap(**indata, mode='r') + outdata = np.memmap(**outdata, mode='r+') + + # sleep to simulate expensive model fitting + print(f'Fitting model at slice {sliceidx}') + time.sleep(1) + outdata[:, :, sliceidx] = indata[:, :, sliceidx, :].mean() + sliceidx +``` + +Now we can use `multiprocessing` to fit the model in parallel across all of the +image slices: + +``` +fit_function = ft.partial(fit_model, data_params, model_params) +slice_idxs = list(range(91)) + +with mp.Pool(processes=16) as pool: + pool.map(fit_function, slice_idxs) + +print(model) +``` + + +<a class="anchor" id="read-only-sharing"></a> +### Read-only sharing + + +If you are working with a large dataset, you have determined that copying data +between processes is having a substantial impact on your performance, and have +also decided that memory-mapping is not an option for you, and instead wish to +*share* a single copy of the data between the processes, you will need to: 1. Structure your code so that the data you want to share is accessible at the *module level*. @@ -645,10 +737,6 @@ any copying required. [wiki-fork]: https://en.wikipedia.org/wiki/Fork_(system_call) -<a class="anchor" id="read-only-sharing"></a> -### Read-only sharing - - Let's see this in action with a simple example. We'll start by defining a horrible little helper function which allows us to track the total memory usage: -- GitLab