Skip to content
Snippets Groups Projects
Commit a6d99654 authored by Paul McCarthy's avatar Paul McCarthy :mountain_bicyclist:
Browse files

Finished dask section, started on fsl-pipe

parent c2854eb3
No related branches found
No related tags found
1 merge request!38Add practical which introduces some third-party parallelisation libraries
%% Cell type:markdown id:65545a03 tags: %% Cell type:markdown id:2b88a4ad tags:
# Parallel processing in Python # Parallel processing in Python
While Python has built-in support for threading and parallelising in the While Python has built-in support for threading and parallelising in the
[`threading`](https://docs.python.org/3/library/threading.html) and [`threading`](https://docs.python.org/3/library/threading.html) and
[`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)
modules, there are a range of third-party libraries which you can use to improve the performance of your code. modules, there are a range of third-party libraries which you can use to improve the performance of your code.
Contents: Contents:
* [JobLib](#joblib) * [JobLib](#joblib)
* [Dask](#dask) * [Dask](#dask)
* [Dask Numpy API](#dask-numpy-api) * [Dask Numpy API](#dask-numpy-api)
* [Low-level Dask API](#low-level-dask-api) * [Low-level Dask API](#low-level-dask-api)
* [Distributing computation with Dask](#distributing-computation-with-dask) * [Distributing computation with Dask](#distributing-computation-with-dask)
* [fsl-pipe](#fsl-pipe) * [fsl-pipe](#fsl-pipe)
<a class="anchor" id="joblib"></a> <a class="anchor" id="joblib"></a>
# JobLib # JobLib
https://joblib.readthedocs.io/en/stable/ https://joblib.readthedocs.io/en/stable/
JobLib has been around for a while, and is a simple and lightweight library with two main features: JobLib has been around for a while, and is a simple and lightweight library with two main features:
- An API for "embarrassingly parallel" tasks. - An API for "embarrassingly parallel" tasks.
- An API for caching/re-using the results of time-consuming computations. - An API for caching/re-using the results of time-consuming computations.
On the surface, JobLib does not provide any functionality that cannot already be accomplished with built-in libraries such as `multiprocessing` and `functools`. However, the JobLib developers have put a lot of effort into ensuring that JobLib: On the surface, JobLib does not provide any functionality that cannot already be accomplished with built-in libraries such as `multiprocessing` and `functools`. However, the JobLib developers have put a lot of effort into ensuring that JobLib:
- is easy to use, - is easy to use,
- works consistently across all of the major platforms, and - works consistently across all of the major platforms, and
- works as efficiently as possible with Numpy arrays - works as efficiently as possible with Numpy arrays
We can use the JobLib API by importing the `joblib` package (we'll also use `numpy` and `time` in this example): We can use the JobLib API by importing the `joblib` package (we'll also use `numpy` and `time` in this example):
%% Cell type:code id:5c8c5a11 tags: %% Cell type:code id:5791f029 tags:
``` ```
import numpy as np import numpy as np
import joblib import joblib
import time import time
``` ```
%% Cell type:markdown id:5b128829 tags: %% Cell type:markdown id:ecf0ad57 tags:
Now, let's say that we have a collection of `numpy` arrays containing image data for a group of subjects: Now, let's say that we have a collection of `numpy` arrays containing image data for a group of subjects:
%% Cell type:code id:8727c53a tags: %% Cell type:code id:53839192 tags:
``` ```
datasets = [np.random.random((91, 109, 91)) for i in range(10)] datasets = [np.random.random((91, 109, 91)) for i in range(10)]
``` ```
%% Cell type:markdown id:211fb083 tags: %% Cell type:markdown id:733f66cc tags:
And we want to calculate some metric on each image. We simply need to define a function which contains the calculation we want to perform (the `time.sleep` call is there just to simulate a complex calculation): And we want to calculate some metric on each image. We simply need to define a function which contains the calculation we want to perform (the `time.sleep` call is there just to simulate a complex calculation):
%% Cell type:code id:bcd89418 tags: %% Cell type:code id:049cd8dd tags:
``` ```
def do_calculation(input_data): def do_calculation(input_data):
time.sleep(2) time.sleep(2)
return input_data.mean() return input_data.mean()
``` ```
%% Cell type:markdown id:4a499b91 tags: %% Cell type:markdown id:1266287d tags:
We can now use `joblib.Parallel` and `joblib.delayed` to parallelise those calculations. `joblib.Parallel` sets up a pool of worker processes, and `joblib.delayed` schedules a single instantiation of the function, and associated data, which is to be executed. We can execute a sequence of `delayed` tasks by passing them to the `Parallel` pool: We can now use `joblib.Parallel` and `joblib.delayed` to parallelise those calculations. `joblib.Parallel` sets up a pool of worker processes, and `joblib.delayed` schedules a single instantiation of the function, and associated data, which is to be executed. We can execute a sequence of `delayed` tasks by passing them to the `Parallel` pool:
%% Cell type:code id:f96c07ac tags: %% Cell type:code id:931cb5d7 tags:
``` ```
with joblib.Parallel(n_jobs=-1) as pool: with joblib.Parallel(n_jobs=-1) as pool:
tasks = [joblib.delayed(do_calculation)(d) for d in datasets] tasks = [joblib.delayed(do_calculation)(d) for d in datasets]
results = pool(tasks) results = pool(tasks)
print(results) print(results)
``` ```
%% Cell type:markdown id:f0b73045 tags: %% Cell type:markdown id:5b499444 tags:
Just like with `multiprocessing`, JobLib is susceptible to the same problems with regard to sharing data between processes (these problems are Just like with `multiprocessing`, JobLib is susceptible to the same problems with regard to sharing data between processes (these problems are
discussed in the `advanced_programming/threading.ipynb` notebook). In the example above, each dataset is serialised and copied from the main process discussed in the `advanced_programming/threading.ipynb` notebook). In the example above, each dataset is serialised and copied from the main process
to the worker processes, and then the result copied back. This behaviour could be a performance bottleneck for your own task, or you may be working to the worker processes, and then the result copied back. This behaviour could be a performance bottleneck for your own task, or you may be working
on a system with limited memory which is incapable of storing several copies of the data. on a system with limited memory which is incapable of storing several copies of the data.
To deal with this, we can use memory-mapped Numpy arrays. This is a feature built into Numpy, and supported by JobLib, which stores your data in your file system instead of in memory. This allows your data to be simultaneously read and written by multiple processes. To deal with this, we can use memory-mapped Numpy arrays. This is a feature built into Numpy, and supported by JobLib, which stores your data in your file system instead of in memory. This allows your data to be simultaneously read and written by multiple processes.
You can create `numpy.memmap` arrays directly, but JobLib will also automatically convert any Numpy arrays which are larger than a specified threshold into memory-mapped arrays. This threshold defaults to 1MB, and you can change it via the `n_bytes` option when you create a `joblib.Parallel` pool. You can create `numpy.memmap` arrays directly, but JobLib will also automatically convert any Numpy arrays which are larger than a specified threshold into memory-mapped arrays. This threshold defaults to 1MB, and you can change it via the `n_bytes` option when you create a `joblib.Parallel` pool.
To see this in action, imagine that you have some 4D fMRI data, and wish to fit a complicated model to the time series at each voxel. We will write our model fitting function so that it works on one slice at a time: To see this in action, imagine that you have some 4D fMRI data, and wish to fit a complicated model to the time series at each voxel. We will write our model fitting function so that it works on one slice at a time:
%% Cell type:code id:b2966ebe tags: %% Cell type:code id:6ec1fb3a tags:
``` ```
def fit_model(indata, outdata, sliceidx): def fit_model(indata, outdata, sliceidx):
print(f'Fitting model at slice {sliceidx}') print(f'Fitting model at slice {sliceidx}')
time.sleep(1) time.sleep(1)
outdata[:, :, sliceidx] = indata[:, :, sliceidx, :].mean() + sliceidx outdata[:, :, sliceidx] = indata[:, :, sliceidx, :].mean() + sliceidx
``` ```
%% Cell type:markdown id:29e18767 tags: %% Cell type:markdown id:314c22e5 tags:
Now we will load our 4D data, and pre-allocate another array to store the fitted model parameters. Now we will load our 4D data, and pre-allocate another array to store the fitted model parameters.
%% Cell type:code id:b7f9b544 tags: %% Cell type:code id:d9f90609 tags:
``` ```
# Imagine that we have loaded this data from a file # Imagine that we have loaded this data from a file
data = np.random.random((91, 109, 91, 50)).astype(np.float32) data = np.random.random((91, 109, 91, 50)).astype(np.float32)
# Pre-allocate space to store the fitted model parameters # Pre-allocate space to store the fitted model parameters
model = np.memmap('model.mmap', model = np.memmap('model.mmap',
shape=(91, 109, 91), shape=(91, 109, 91),
dtype=np.float32, dtype=np.float32,
mode='w+') mode='w+')
# Fit our model, processing slices in parallel # Fit our model, processing slices in parallel
with joblib.Parallel(n_jobs=-1) as pool: with joblib.Parallel(n_jobs=-1) as pool:
pool(joblib.delayed(fit_model)(data, model, slc) for slc in range(91)) pool(joblib.delayed(fit_model)(data, model, slc) for slc in range(91))
print(model) print(model)
``` ```
%% Cell type:markdown id:851e0392 tags: %% Cell type:markdown id:47dc9a3b tags:
<a class="anchor" id="dask"></a> <a class="anchor" id="dask"></a>
# Dask # Dask
https://www.dask.org/ https://www.dask.org/
Dask is a very powerful library which you can use to parallelise your code, and to distribute computation across multiple machines. You can use Dask locally on your own computer, on HPC clusters (e.g. SLURM and SGE) and with cloud compute platforms (e.g. AWS, Azure). Dask is a very powerful library which you can use to parallelise your code, and to distribute computation across multiple machines. You can use Dask locally on your own computer, on HPC clusters (e.g. SLURM and SGE) and with cloud compute platforms (e.g. AWS, Azure).
Dask has two main components: Dask has two main components:
- APIs for defining tasks/jobs - there is a low-level API similar to that provided by JobLib, but Dask also has sophisticated high-level APIs for working with Pandas and Numpy-style data. - APIs for defining tasks/jobs - there is a low-level API similar to that provided by JobLib, but Dask also has sophisticated high-level APIs for working with Pandas and Numpy-style data.
- A task scheduler which builds a graph of all the tasks that need to be executed, and which manges their execution, either locally or remotely. - A task scheduler which builds a graph of all the tasks that need to be executed, and which manges their execution, either locally or remotely.
We will introduce the Numpy API and will briefly cover the low-level API, and then demonstrate how to use Dask to perform calculations on a SGE cluster. We will introduce the Numpy API and will briefly cover the low-level API, and then demonstrate how to use Dask to perform calculations on a SGE cluster.
<a class="anchor" id="dask-numpy-api"></a> <a class="anchor" id="dask-numpy-api"></a>
## Dask Numpy API ## Dask Numpy API
https://docs.dask.org/en/stable/array.html
> Dask also provides a Pandas-style API, accessible in the `dask.dataframe` package - you can read about it at https://docs.dask.org/en/stable/dataframe.html.
To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package: To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package:
%% Cell type:code id:83f8a8df tags: %% Cell type:code id:7923a7be tags:
``` ```
import numpy as np import numpy as np
import dask.array as da import dask.array as da
data = da.random.random((1000, 1000, 1000, 20)).astype(float32) data = da.random.random((1000, 1000, 1000, 20)).astype(np.float32)
data
``` ```
%% Cell type:markdown id:ae58302e tags: %% Cell type:markdown id:2bdd29e6 tags:
If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers. If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers.
> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and would actually temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default). > The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and in fact would temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default).
However: However:
- Dask automatically splits arrays into smaller chunks - in the above code, `data` has been split into 1331 smaller arrays, each of which has shape `(94, 94, 94, 10)`, and requires only 64 megabytes. - Dask automatically splits arrays into smaller chunks - in the above code, `data` has been split into 1331 smaller arrays, each of which has shape `(94, 94, 94, 10)`, and requires only 64 megabytes.
- Most operations in the `dask.array` package are _lazily evaluated_. The above call has not actually created any data - Dask has just created a set of jobs which describe how to create a large array of random values. - Most operations in the `dask.array` package are _lazily evaluated_. The above call has not actually created any data - Dask has just created a set of jobs which describe how to create a large array of random values.
When using `dask.array`, nothing is actually executed until you call the `compute()` function. For example, we can try calculating the mean of our large array: When using `dask.array`, nothing is actually executed until you call the `compute()` function. For example, we can try calculating the mean of our large array:
%% Cell type:code id:67fc4ec6 tags: %% Cell type:code id:b2605c92 tags:
``` ```
m = data.mean() m = data.mean()
m m
``` ```
%% Cell type:markdown id:9a2d89c1 tags: %% Cell type:markdown id:08bf1c77 tags:
But again, this will not actually calculate the mean - it just defines a task which, when executed, will calculate the mean over the `data` array. But again, this will not actually calculate the mean - it just defines a task which, when executed, will calculate the mean over the `data` array.
We can _execute_ that task by callintg `compute()` on the result: We can _execute_ that task by callintg `compute()` on the result:
%% Cell type:code id:0f60b5b3 tags: %% Cell type:code id:cecf0b61 tags:
``` ```
print(m.compute()) print(m.compute())
``` ```
%% Cell type:markdown id:5e51a09e tags: %% Cell type:markdown id:1472b53d tags:
Dask arrays support most of the functionality of Numpy arrays, but there are a few exceptions, most notably the `numpy.linalg` package.
<a class="anchor" id="low-level-dask-api"></a> <a class="anchor" id="low-level-dask-api"></a>
## Low-level Dask API ## Low-level Dask API
In addition to the Numpy and Pandas APIs, Dask also has a lower-level interface which allows you to create and lazily execute a pipeline made up of Python functions. This API is based around `dask.delayed`, which is a Python decorator that can be applied to any function, and which tells Dask that a function should be lazily executed.
As a very simple example (taken from the [Dask documentation](https://docs.dask.org/en/stable/delayed.html#example)), consider this simple numerical task, where we have a set of numbers and, for each number `x`, we want to calculate `(x * x)`, and then add all of the results together (i.e. the sum of squares). We'll start by defining a function for each of the operations that we need to perform:
%% Cell type:code id:1f764830 tags:
```
def square(x):
return x * x
def sum(values):
total = 0
for v in values:
total = total + v
return total
```
%% Cell type:markdown id:fd5252f6 tags:
We could solve this problem without parallelism by using conventional Python code, which might look something like the following:
%% Cell type:code id:b0910aaf tags:
```
data = [1, 2, 3, 4, 5]
output = []
for x in data:
s = square(x)
output.append(s)
total = sum(output)
print(total)
```
%% Cell type:markdown id:8a7a45f5 tags:
However, this problem is inherently parallelisable - we are independently performing the same series of steps to each of our inputs, before the final tallying. We can use the `dask.delayed` function to take advantage of this:
%% Cell type:code id:9031f120 tags:
```
import dask
output = []
for x in data:
a = dask.delayed(square)(x)
output.append(a)
total = dask.delayed(sum)(output)
```
%% Cell type:markdown id:6bc3ef4c tags:
We have not actually performed any computations yet. What we have done is built a _graph_ of operations that we need to perform. Dask refers to this as a **task graph** - Dask keeps track of the dependencies of each function that we add, and so is able to determine the order in they need to be executed, and also which steps do not depend on each other and so can be executed in parallel. Dask can even generate an image of our task graph for us:
%% Cell type:code id:9f0833d9 tags:
```
total.visualize()
```
%% Cell type:markdown id:0164c407 tags:
And then when we are ready, we can call `compute()` to actually do the calculation:
%% Cell type:code id:8561997b tags:
```
total.compute()
```
%% Cell type:markdown id:3590a9f9 tags:
For a more realistic example, let's imagine that we have T1 MRI images for five subjects, and we want to perform basic structural preprocessing on each of them (reorientation, FOV reduction, and brain extraction). He're creating this example data set (all of the T1 images are just a copy of the `bighead.nii.gz` image, from the FSL course data).
%% Cell type:code id:c3879db5 tags:
```
import os
import shutil
os.makedirs('braindata', exist_ok=True)
for i in range(1, 6):
shutil.copy('../../applications/fslpy/bighead.nii.gz', f'braindata/{i:02d}.nii.gz')
```
%% Cell type:markdown id:77851c91 tags:
And now we can build our pipeline. The [fslpy](https://open.win.ox.ac.uk/pages/fsl/fslpy/) library has a collection of functions which we can use to call the FSL commands that we need. We need to do a little work, as by default `dask.delayed` assumes that the dependencies of a task are passed in as arguments (there are other ways of dealing with this, but in this example it is easy to simply write a few small functions that define each of our tasks):
%% Cell type:code id:1f9d5f36 tags:
```
import fsl.wrappers as fw
def reorient(input, output):
fw.fslreorient2std(input, output)
return output
def fov(input, output):
fw.robustfov(input, output)
return output
def bet(input, output):
fw.bet(input, output)
return output
```
%% Cell type:code id:4a762719 tags:
```
import glob
import dask
import fsl.data.image as fslimage
inputs = list(glob.glob('braindata/??.nii.gz'))
tasks = []
for input in inputs:
basename = fslimage.removeExt(input)
r = dask.delayed(reorient)(input, f'{basename}_reorient.nii.gz')
f = dask.delayed(fov)(r, f'{basename}_fov.nii.gz')
b = dask.delayed(bet)(f, f'{basename}_brain.nii.gz')
tasks.append(b)
```
%% Cell type:markdown id:1ed04761 tags:
In the previous example we had a single output (the result of summing the squared input values) which we called `visualize()` on. We can also call `dask.visualize()` to visualise a collection of tasks:
%% Cell type:code id:f62c7c6d tags:
```
dask.visualize(*tasks)
```
%% Cell type:markdown id:63017c42 tags:
And, similarly, we can call `dask.compute` to run them all:
%% Cell type:code id:7b6e40a9 tags:
```
outputs = dask.compute(*tasks)
print(outputs)
```
%% Cell type:markdown id:5dbb82f6 tags:
<a class="anchor" id="distributing-computation-with-dask"></a> <a class="anchor" id="distributing-computation-with-dask"></a>
## Distributing computation with Dask ## Distributing computation with Dask
In the examples above, Dask was running its tasks in parallel on your local machine. But it is easy to instruct Dask to distribute its computations across multiple machines. For example, Dask has support for executing tasks on a SGE or SLURM cluster, which we have available in Oxford at FMRIB and the BDI.
To use this functionality, we need an additional library called `dask-jobqueue` which, at the moment, is not installed as part of FSL.
> Note that the code cells below will not work if you are running this notebook on your own computer (unless you happen to have a SGE cluster system installed on your laptop).
The first step is to create a `SGECluster` (or `SLURMCluster`) object. You need to populate this object with information about a _single job_ in your workflow. At a minumum, you must specify the number of cores and memory that are available to a single job:
%% Cell type:code id:b6e7814e tags:
```
from dask_jobqueue import SGECluster
cluster = SGECluster(cores=2, memory='16GB')
```
%% Cell type:markdown id:10dd646c tags:
You can also set a range of other options, including specifying a queue name (e.g. `queue='short.q'`), or specifying the total amount of time that your job will run for (e.g. `walltime='01:00:00'` for one hour).
The next step is to create some jobs by calling the `scale()` method:
%% Cell type:code id:bfa74705 tags:
```
cluster.scale(jobs=5)
```
%% Cell type:markdown id:7123a281 tags:
Behind the scenes, the `scale()` method uses `qsub` to create five "worker processes", each running on a different cluster node. In the first instance, these worker processes will be sitting idle, doing nothing, and waiting for you to give them a task to run.
The final step is to create "client" object. This is an important step which configures Dask to use the cluster we have just set up:
%% Cell type:code id:0c8502aa tags:
fsl-pipe ```
fsl_sub client = cluster.get_client()
ray ```
%% Cell type:markdown id:9b43cee7 tags:
After creating a client, any Dask computations that we request will be performed on the cluster by our worker processes:
%% Cell type:code id:c06367ee tags:
```
import dask.array as da
data = da.random.random((1000, 1000, 1000, 10))
print(data.mean().compute())
```
%% Cell type:markdown id:f5c78ed3 tags:
<a class="anchor" id="fsl-pipe"></a>
## fsl-pipe
https://open.win.ox.ac.uk/pages/fsl/fsl-pipe/
fsl-pipe is a Python library (written by our very own Michiel Cottaar) which builds upon another library called [file-tree](https://open.win.ox.ac.uk/pages/fsl/file-tree/), and allows you to write an analysis pipeline in a _declarative_ manner. A pipeline is defined by:
- A "file tree" which defines the directory structure of the input and output files of the pipeline.
- A set of recipes (Python functions) describing how each of the pipeline outputs should be produced.
In a similar vein to Dask task graphs, fsl-pipe will automatically determine which recipes should be executed, and in what order, to produce whichever output file(s) you request. Recipes can either run locally, be distributed using Dask, or be executed on a cluster using `fsl_sub`.
For example, let's again imagine that we have some T1 images upon which we wish to perform basic structural preprocessing, and which arranged in the following manner:
> ```
> subjectA/
> T1w.nii.gz
> subjectB/
> T1w.nii.gz
> subjectC/
> T1w.nii.gz
> ```
We must first describe the structure of our data, and save that description as a file-tree file (e.g. `mydata.tree`). This file contains a placeholder for the subject ID, and gives both the input and any desired output files unique identifiers:
> ```
> subject{subject}
> T1w.nii.gz (t1)
> T1w_brain.nii.gz (t1_brain)
> T1w_fov.nii.gz (t1_fov)
> T1w_reorient.nii.gz (t1_reorient)
> ```
%% Cell type:code id:07ff84b0 tags:
```
#!/usr/bin/env python
from fsl.wrappers import bet
from file_tree import FileTree
from fsl_pipe import pipe, In, Out
@pipe
def brain_extract(t1 : In, bet_output : Out, bet_mask : Out):
bet(t1, bet_output, mask=True)
tree = FileTree.read('data.tree').update_glob('t1')
pipe.cli(tree)
```
......
...@@ -126,18 +126,25 @@ We will introduce the Numpy API and will briefly cover the low-level API, and th ...@@ -126,18 +126,25 @@ We will introduce the Numpy API and will briefly cover the low-level API, and th
## Dask Numpy API ## Dask Numpy API
https://docs.dask.org/en/stable/array.html
> Dask also provides a Pandas-style API, accessible in the `dask.dataframe` package - you can read about it at https://docs.dask.org/en/stable/dataframe.html.
To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package: To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package:
``` ```
import numpy as np import numpy as np
import dask.array as da import dask.array as da
data = da.random.random((1000, 1000, 1000, 20)).astype(float32) data = da.random.random((1000, 1000, 1000, 20)).astype(np.float32)
data
``` ```
If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers. If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers.
> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and would actually temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default). > The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and in fact would temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default).
However: However:
...@@ -163,17 +170,223 @@ print(m.compute()) ...@@ -163,17 +170,223 @@ print(m.compute())
``` ```
Dask arrays support most of the functionality of Numpy arrays, but there are a few exceptions, most notably the `numpy.linalg` package.
<a class="anchor" id="low-level-dask-api"></a> <a class="anchor" id="low-level-dask-api"></a>
## Low-level Dask API ## Low-level Dask API
In addition to the Numpy and Pandas APIs, Dask also has a lower-level interface which allows you to create and lazily execute a pipeline made up of Python functions. This API is based around `dask.delayed`, which is a Python decorator that can be applied to any function, and which tells Dask that a function should be lazily executed.
As a very simple example (taken from the [Dask documentation](https://docs.dask.org/en/stable/delayed.html#example)), consider this simple numerical task, where we have a set of numbers and, for each number `x`, we want to calculate `(x * x)`, and then add all of the results together (i.e. the sum of squares). We'll start by defining a function for each of the operations that we need to perform:
```
def square(x):
return x * x
def sum(values):
total = 0
for v in values:
total = total + v
return total
```
We could solve this problem without parallelism by using conventional Python code, which might look something like the following:
```
data = [1, 2, 3, 4, 5]
output = []
for x in data:
s = square(x)
output.append(s)
total = sum(output)
print(total)
```
However, this problem is inherently parallelisable - we are independently performing the same series of steps to each of our inputs, before the final tallying. We can use the `dask.delayed` function to take advantage of this:
```
import dask
output = []
for x in data:
a = dask.delayed(square)(x)
output.append(a)
total = dask.delayed(sum)(output)
```
We have not actually performed any computations yet. What we have done is built a _graph_ of operations that we need to perform. Dask refers to this as a **task graph** - Dask keeps track of the dependencies of each function that we add, and so is able to determine the order in they need to be executed, and also which steps do not depend on each other and so can be executed in parallel. Dask can even generate an image of our task graph for us:
```
total.visualize()
```
And then when we are ready, we can call `compute()` to actually do the calculation:
```
total.compute()
```
For a more realistic example, let's imagine that we have T1 MRI images for five subjects, and we want to perform basic structural preprocessing on each of them (reorientation, FOV reduction, and brain extraction). He're creating this example data set (all of the T1 images are just a copy of the `bighead.nii.gz` image, from the FSL course data).
```
import os
import shutil
os.makedirs('braindata', exist_ok=True)
for i in range(1, 6):
shutil.copy('../../applications/fslpy/bighead.nii.gz', f'braindata/{i:02d}.nii.gz')
```
And now we can build our pipeline. The [fslpy](https://open.win.ox.ac.uk/pages/fsl/fslpy/) library has a collection of functions which we can use to call the FSL commands that we need. We need to do a little work, as by default `dask.delayed` assumes that the dependencies of a task are passed in as arguments (there are other ways of dealing with this, but in this example it is easy to simply write a few small functions that define each of our tasks):
```
import fsl.wrappers as fw
def reorient(input, output):
fw.fslreorient2std(input, output)
return output
def fov(input, output):
fw.robustfov(input, output)
return output
def bet(input, output):
fw.bet(input, output)
return output
```
```
import glob
import dask
import fsl.data.image as fslimage
inputs = list(glob.glob('braindata/??.nii.gz'))
tasks = []
for input in inputs:
basename = fslimage.removeExt(input)
r = dask.delayed(reorient)(input, f'{basename}_reorient.nii.gz')
f = dask.delayed(fov)(r, f'{basename}_fov.nii.gz')
b = dask.delayed(bet)(f, f'{basename}_brain.nii.gz')
tasks.append(b)
```
In the previous example we had a single output (the result of summing the squared input values) which we called `visualize()` on. We can also call `dask.visualize()` to visualise a collection of tasks:
```
dask.visualize(*tasks)
```
And, similarly, we can call `dask.compute` to run them all:
```
outputs = dask.compute(*tasks)
print(outputs)
```
<a class="anchor" id="distributing-computation-with-dask"></a> <a class="anchor" id="distributing-computation-with-dask"></a>
## Distributing computation with Dask ## Distributing computation with Dask
In the examples above, Dask was running its tasks in parallel on your local machine. But it is easy to instruct Dask to distribute its computations across multiple machines. For example, Dask has support for executing tasks on a SGE or SLURM cluster, which we have available in Oxford at FMRIB and the BDI.
To use this functionality, we need an additional library called `dask-jobqueue` which, at the moment, is not installed as part of FSL.
> Note that the code cells below will not work if you are running this notebook on your own computer (unless you happen to have a SGE cluster system installed on your laptop).
The first step is to create a `SGECluster` (or `SLURMCluster`) object. You need to populate this object with information about a _single job_ in your workflow. At a minumum, you must specify the number of cores and memory that are available to a single job:
```
from dask_jobqueue import SGECluster
cluster = SGECluster(cores=2, memory='16GB')
```
You can also set a range of other options, including specifying a queue name (e.g. `queue='short.q'`), or specifying the total amount of time that your job will run for (e.g. `walltime='01:00:00'` for one hour).
The next step is to create some jobs by calling the `scale()` method:
```
cluster.scale(jobs=5)
```
Behind the scenes, the `scale()` method uses `qsub` to create five "worker processes", each running on a different cluster node. In the first instance, these worker processes will be sitting idle, doing nothing, and waiting for you to give them a task to run.
The final step is to create "client" object. This is an important step which configures Dask to use the cluster we have just set up:
```
client = cluster.get_client()
```
After creating a client, any Dask computations that we request will be performed on the cluster by our worker processes:
```
import dask.array as da
data = da.random.random((1000, 1000, 1000, 10))
print(data.mean().compute())
```
<a class="anchor" id="fsl-pipe"></a>
## fsl-pipe
https://open.win.ox.ac.uk/pages/fsl/fsl-pipe/
fsl-pipe is a Python library (written by our very own Michiel Cottaar) which builds upon another library called [file-tree](https://open.win.ox.ac.uk/pages/fsl/file-tree/), and allows you to write an analysis pipeline in a _declarative_ manner. A pipeline is defined by:
- A "file tree" which defines the directory structure of the input and output files of the pipeline.
- A set of recipes (Python functions) describing how each of the pipeline outputs should be produced.
fsl-pipe In a similar vein to Dask task graphs, fsl-pipe will automatically determine which recipes should be executed, and in what order, to produce whichever output file(s) you request. Recipes can either run locally, be distributed using Dask, or be executed on a cluster using `fsl_sub`.
fsl_sub
ray
For example, let's again imagine that we have some T1 images upon which we wish to perform basic structural preprocessing, and which arranged in the following manner:
> ```
> subjectA/
> T1w.nii.gz
> subjectB/
> T1w.nii.gz
> subjectC/
> T1w.nii.gz
> ```
We must first describe the structure of our data, and save that description as a file-tree file (e.g. `mydata.tree`). This file contains a placeholder for the subject ID, and gives both the input and any desired output files unique identifiers:
> ```
> subject{subject}
> T1w.nii.gz (t1)
> T1w_brain.nii.gz (t1_brain)
> T1w_fov.nii.gz (t1_fov)
> T1w_reorient.nii.gz (t1_reorient)
> ```
```
#!/usr/bin/env python
from fsl.wrappers import bet
from file_tree import FileTree
from fsl_pipe import pipe, In, Out
@pipe
def brain_extract(t1 : In, bet_output : Out, bet_mask : Out):
bet(t1, bet_output, mask=True)
tree = FileTree.read('data.tree').update_glob('t1')
pipe.cli(tree)
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment