From c2854eb3fccb33d5953d8f8852407871efd8a334 Mon Sep 17 00:00:00 2001 From: manifest-rules <manifest-rules@git.fmrib.ox.ac.uk> Date: Mon, 4 Mar 2024 16:52:08 +0000 Subject: [PATCH] New notebook on third-party parallelisation libs (WIP) --- applications/parallel/parallel.ipynb | 295 +++++++++++++++++++++++++++ applications/parallel/parallel.md | 179 ++++++++++++++++ 2 files changed, 474 insertions(+) create mode 100644 applications/parallel/parallel.ipynb create mode 100644 applications/parallel/parallel.md diff --git a/applications/parallel/parallel.ipynb b/applications/parallel/parallel.ipynb new file mode 100644 index 0000000..2ddabc9 --- /dev/null +++ b/applications/parallel/parallel.ipynb @@ -0,0 +1,295 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "65545a03", + "metadata": {}, + "source": [ + "# Parallel processing in Python\n", + "\n", + "\n", + "While Python has built-in support for threading and parallelising in the\n", + "[`threading`](https://docs.python.org/3/library/threading.html) and\n", + "[`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)\n", + "modules, there are a range of third-party libraries which you can use to improve the performance of your code.\n", + "\n", + "Contents:\n", + "\n", + "* [JobLib](#joblib)\n", + "* [Dask](#dask)\n", + " * [Dask Numpy API](#dask-numpy-api)\n", + " * [Low-level Dask API](#low-level-dask-api)\n", + " * [Distributing computation with Dask](#distributing-computation-with-dask)\n", + "* [fsl-pipe](#fsl-pipe)\n", + "\n", + "\n", + "<a class=\"anchor\" id=\"joblib\"></a>\n", + "# JobLib\n", + "\n", + "https://joblib.readthedocs.io/en/stable/\n", + "\n", + "\n", + "JobLib has been around for a while, and is a simple and lightweight library with two main features:\n", + "\n", + " - An API for \"embarrassingly parallel\" tasks.\n", + " - An API for caching/re-using the results of time-consuming computations.\n", + "\n", + "On the surface, JobLib does not provide any functionality that cannot already be accomplished with built-in libraries such as `multiprocessing` and `functools`. However, the JobLib developers have put a lot of effort into ensuring that JobLib:\n", + "\n", + " - is easy to use,\n", + " - works consistently across all of the major platforms, and\n", + " - works as efficiently as possible with Numpy arrays\n", + "\n", + "We can use the JobLib API by importing the `joblib` package (we'll also use `numpy` and `time` in this example):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5c8c5a11", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import joblib\n", + "import time" + ] + }, + { + "cell_type": "markdown", + "id": "5b128829", + "metadata": {}, + "source": [ + "Now, let's say that we have a collection of `numpy` arrays containing image data for a group of subjects:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8727c53a", + "metadata": {}, + "outputs": [], + "source": [ + "datasets = [np.random.random((91, 109, 91)) for i in range(10)]" + ] + }, + { + "cell_type": "markdown", + "id": "211fb083", + "metadata": {}, + "source": [ + "And we want to calculate some metric on each image. We simply need to define a function which contains the calculation we want to perform (the `time.sleep` call is there just to simulate a complex calculation):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bcd89418", + "metadata": {}, + "outputs": [], + "source": [ + "def do_calculation(input_data):\n", + " time.sleep(2)\n", + " return input_data.mean()" + ] + }, + { + "cell_type": "markdown", + "id": "4a499b91", + "metadata": {}, + "source": [ + "We can now use `joblib.Parallel` and `joblib.delayed` to parallelise those calculations. `joblib.Parallel` sets up a pool of worker processes, and `joblib.delayed` schedules a single instantiation of the function, and associated data, which is to be executed. We can execute a sequence of `delayed` tasks by passing them to the `Parallel` pool:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f96c07ac", + "metadata": {}, + "outputs": [], + "source": [ + "with joblib.Parallel(n_jobs=-1) as pool:\n", + " tasks = [joblib.delayed(do_calculation)(d) for d in datasets]\n", + " results = pool(tasks)\n", + "print(results)" + ] + }, + { + "cell_type": "markdown", + "id": "f0b73045", + "metadata": {}, + "source": [ + "Just like with `multiprocessing`, JobLib is susceptible to the same problems with regard to sharing data between processes (these problems are\n", + "discussed in the `advanced_programming/threading.ipynb` notebook). In the example above, each dataset is serialised and copied from the main process\n", + "to the worker processes, and then the result copied back. This behaviour could be a performance bottleneck for your own task, or you may be working\n", + "on a system with limited memory which is incapable of storing several copies of the data.\n", + "\n", + "To deal with this, we can use memory-mapped Numpy arrays. This is a feature built into Numpy, and supported by JobLib, which stores your data in your file system instead of in memory. This allows your data to be simultaneously read and written by multiple processes.\n", + "\n", + "You can create `numpy.memmap` arrays directly, but JobLib will also automatically convert any Numpy arrays which are larger than a specified threshold into memory-mapped arrays. This threshold defaults to 1MB, and you can change it via the `n_bytes` option when you create a `joblib.Parallel` pool.\n", + "\n", + "To see this in action, imagine that you have some 4D fMRI data, and wish to fit a complicated model to the time series at each voxel. We will write our model fitting function so that it works on one slice at a time:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2966ebe", + "metadata": {}, + "outputs": [], + "source": [ + "def fit_model(indata, outdata, sliceidx):\n", + " print(f'Fitting model at slice {sliceidx}')\n", + " time.sleep(1)\n", + " outdata[:, :, sliceidx] = indata[:, :, sliceidx, :].mean() + sliceidx" + ] + }, + { + "cell_type": "markdown", + "id": "29e18767", + "metadata": {}, + "source": [ + "Now we will load our 4D data, and pre-allocate another array to store the fitted model parameters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7f9b544", + "metadata": {}, + "outputs": [], + "source": [ + "# Imagine that we have loaded this data from a file\n", + "data = np.random.random((91, 109, 91, 50)).astype(np.float32)\n", + "\n", + "# Pre-allocate space to store the fitted model parameters\n", + "model = np.memmap('model.mmap',\n", + " shape=(91, 109, 91),\n", + " dtype=np.float32,\n", + " mode='w+')\n", + "\n", + "# Fit our model, processing slices in parallel\n", + "with joblib.Parallel(n_jobs=-1) as pool:\n", + " pool(joblib.delayed(fit_model)(data, model, slc) for slc in range(91))\n", + "\n", + "print(model)" + ] + }, + { + "cell_type": "markdown", + "id": "851e0392", + "metadata": {}, + "source": [ + "<a class=\"anchor\" id=\"dask\"></a>\n", + "# Dask\n", + "\n", + "https://www.dask.org/\n", + "\n", + "\n", + "Dask is a very powerful library which you can use to parallelise your code, and to distribute computation across multiple machines. You can use Dask locally on your own computer, on HPC clusters (e.g. SLURM and SGE) and with cloud compute platforms (e.g. AWS, Azure).\n", + "\n", + "Dask has two main components:\n", + "\n", + " - APIs for defining tasks/jobs - there is a low-level API similar to that provided by JobLib, but Dask also has sophisticated high-level APIs for working with Pandas and Numpy-style data.\n", + "\n", + " - A task scheduler which builds a graph of all the tasks that need to be executed, and which manges their execution, either locally or remotely.\n", + "\n", + "We will introduce the Numpy API and will briefly cover the low-level API, and then demonstrate how to use Dask to perform calculations on a SGE cluster.\n", + "\n", + "\n", + "<a class=\"anchor\" id=\"dask-numpy-api\"></a>\n", + "## Dask Numpy API\n", + "\n", + "\n", + "To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83f8a8df", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import dask.array as da\n", + "\n", + "data = da.random.random((1000, 1000, 1000, 20)).astype(float32)" + ] + }, + { + "cell_type": "markdown", + "id": "ae58302e", + "metadata": {}, + "source": [ + "If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers.\n", + "\n", + "> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and would actually temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default).\n", + "\n", + "\n", + "However:\n", + "\n", + " - Dask automatically splits arrays into smaller chunks - in the above code, `data` has been split into 1331 smaller arrays, each of which has shape `(94, 94, 94, 10)`, and requires only 64 megabytes.\n", + "\n", + " - Most operations in the `dask.array` package are _lazily evaluated_. The above call has not actually created any data - Dask has just created a set of jobs which describe how to create a large array of random values.\n", + "\n", + "When using `dask.array`, nothing is actually executed until you call the `compute()` function. For example, we can try calculating the mean of our large array:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67fc4ec6", + "metadata": {}, + "outputs": [], + "source": [ + "m = data.mean()\n", + "m" + ] + }, + { + "cell_type": "markdown", + "id": "9a2d89c1", + "metadata": {}, + "source": [ + "But again, this will not actually calculate the mean - it just defines a task which, when executed, will calculate the mean over the `data` array.\n", + "\n", + "We can _execute_ that task by callintg `compute()` on the result:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f60b5b3", + "metadata": {}, + "outputs": [], + "source": [ + "print(m.compute())" + ] + }, + { + "cell_type": "markdown", + "id": "5e51a09e", + "metadata": {}, + "source": [ + "<a class=\"anchor\" id=\"low-level-dask-api\"></a>\n", + "## Low-level Dask API\n", + "\n", + "\n", + "\n", + "<a class=\"anchor\" id=\"distributing-computation-with-dask\"></a>\n", + "## Distributing computation with Dask\n", + "\n", + "\n", + "\n", + "\n", + "fsl-pipe\n", + "fsl_sub\n", + "ray" + ] + } + ], + "metadata": {}, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/applications/parallel/parallel.md b/applications/parallel/parallel.md new file mode 100644 index 0000000..b22f287 --- /dev/null +++ b/applications/parallel/parallel.md @@ -0,0 +1,179 @@ +# Parallel processing in Python + + +While Python has built-in support for threading and parallelising in the +[`threading`](https://docs.python.org/3/library/threading.html) and +[`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) +modules, there are a range of third-party libraries which you can use to improve the performance of your code. + +Contents: + +* [JobLib](#joblib) +* [Dask](#dask) + * [Dask Numpy API](#dask-numpy-api) + * [Low-level Dask API](#low-level-dask-api) + * [Distributing computation with Dask](#distributing-computation-with-dask) +* [fsl-pipe](#fsl-pipe) + + +<a class="anchor" id="joblib"></a> +# JobLib + +https://joblib.readthedocs.io/en/stable/ + + +JobLib has been around for a while, and is a simple and lightweight library with two main features: + + - An API for "embarrassingly parallel" tasks. + - An API for caching/re-using the results of time-consuming computations. + +On the surface, JobLib does not provide any functionality that cannot already be accomplished with built-in libraries such as `multiprocessing` and `functools`. However, the JobLib developers have put a lot of effort into ensuring that JobLib: + + - is easy to use, + - works consistently across all of the major platforms, and + - works as efficiently as possible with Numpy arrays + +We can use the JobLib API by importing the `joblib` package (we'll also use `numpy` and `time` in this example): + +``` +import numpy as np +import joblib +import time +``` + +Now, let's say that we have a collection of `numpy` arrays containing image data for a group of subjects: + + +``` +datasets = [np.random.random((91, 109, 91)) for i in range(10)] +``` + +And we want to calculate some metric on each image. We simply need to define a function which contains the calculation we want to perform (the `time.sleep` call is there just to simulate a complex calculation): + +``` +def do_calculation(input_data): + time.sleep(2) + return input_data.mean() +``` + +We can now use `joblib.Parallel` and `joblib.delayed` to parallelise those calculations. `joblib.Parallel` sets up a pool of worker processes, and `joblib.delayed` schedules a single instantiation of the function, and associated data, which is to be executed. We can execute a sequence of `delayed` tasks by passing them to the `Parallel` pool: + + +``` +with joblib.Parallel(n_jobs=-1) as pool: + tasks = [joblib.delayed(do_calculation)(d) for d in datasets] + results = pool(tasks) +print(results) +``` + +Just like with `multiprocessing`, JobLib is susceptible to the same problems with regard to sharing data between processes (these problems are +discussed in the `advanced_programming/threading.ipynb` notebook). In the example above, each dataset is serialised and copied from the main process +to the worker processes, and then the result copied back. This behaviour could be a performance bottleneck for your own task, or you may be working +on a system with limited memory which is incapable of storing several copies of the data. + +To deal with this, we can use memory-mapped Numpy arrays. This is a feature built into Numpy, and supported by JobLib, which stores your data in your file system instead of in memory. This allows your data to be simultaneously read and written by multiple processes. + +You can create `numpy.memmap` arrays directly, but JobLib will also automatically convert any Numpy arrays which are larger than a specified threshold into memory-mapped arrays. This threshold defaults to 1MB, and you can change it via the `n_bytes` option when you create a `joblib.Parallel` pool. + +To see this in action, imagine that you have some 4D fMRI data, and wish to fit a complicated model to the time series at each voxel. We will write our model fitting function so that it works on one slice at a time: + + +``` +def fit_model(indata, outdata, sliceidx): + print(f'Fitting model at slice {sliceidx}') + time.sleep(1) + outdata[:, :, sliceidx] = indata[:, :, sliceidx, :].mean() + sliceidx +``` + +Now we will load our 4D data, and pre-allocate another array to store the fitted model parameters. + + +``` +# Imagine that we have loaded this data from a file +data = np.random.random((91, 109, 91, 50)).astype(np.float32) + +# Pre-allocate space to store the fitted model parameters +model = np.memmap('model.mmap', + shape=(91, 109, 91), + dtype=np.float32, + mode='w+') + +# Fit our model, processing slices in parallel +with joblib.Parallel(n_jobs=-1) as pool: + pool(joblib.delayed(fit_model)(data, model, slc) for slc in range(91)) + +print(model) +``` + +<a class="anchor" id="dask"></a> +# Dask + +https://www.dask.org/ + + +Dask is a very powerful library which you can use to parallelise your code, and to distribute computation across multiple machines. You can use Dask locally on your own computer, on HPC clusters (e.g. SLURM and SGE) and with cloud compute platforms (e.g. AWS, Azure). + +Dask has two main components: + + - APIs for defining tasks/jobs - there is a low-level API similar to that provided by JobLib, but Dask also has sophisticated high-level APIs for working with Pandas and Numpy-style data. + + - A task scheduler which builds a graph of all the tasks that need to be executed, and which manges their execution, either locally or remotely. + +We will introduce the Numpy API and will briefly cover the low-level API, and then demonstrate how to use Dask to perform calculations on a SGE cluster. + + +<a class="anchor" id="dask-numpy-api"></a> +## Dask Numpy API + + +To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package: + +``` +import numpy as np +import dask.array as da + +data = da.random.random((1000, 1000, 1000, 20)).astype(float32) +``` + +If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers. + +> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and would actually temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default). + + +However: + + - Dask automatically splits arrays into smaller chunks - in the above code, `data` has been split into 1331 smaller arrays, each of which has shape `(94, 94, 94, 10)`, and requires only 64 megabytes. + + - Most operations in the `dask.array` package are _lazily evaluated_. The above call has not actually created any data - Dask has just created a set of jobs which describe how to create a large array of random values. + +When using `dask.array`, nothing is actually executed until you call the `compute()` function. For example, we can try calculating the mean of our large array: + + +``` +m = data.mean() +m +``` + +But again, this will not actually calculate the mean - it just defines a task which, when executed, will calculate the mean over the `data` array. + +We can _execute_ that task by callintg `compute()` on the result: + +``` +print(m.compute()) +``` + + +<a class="anchor" id="low-level-dask-api"></a> +## Low-level Dask API + + + +<a class="anchor" id="distributing-computation-with-dask"></a> +## Distributing computation with Dask + + + + +fsl-pipe +fsl_sub +ray -- GitLab