diff --git a/applications/parallel/parallel.ipynb b/applications/parallel/parallel.ipynb index 2ddabc9928a814210eb8ff41a89394e496562ce5..acfb2866d366bbd7347c36e42ed4e577e9bce185 100644 --- a/applications/parallel/parallel.ipynb +++ b/applications/parallel/parallel.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "markdown", - "id": "65545a03", + "id": "2b88a4ad", "metadata": {}, "source": [ "# Parallel processing in Python\n", @@ -46,7 +46,7 @@ { "cell_type": "code", "execution_count": null, - "id": "5c8c5a11", + "id": "5791f029", "metadata": {}, "outputs": [], "source": [ @@ -57,7 +57,7 @@ }, { "cell_type": "markdown", - "id": "5b128829", + "id": "ecf0ad57", "metadata": {}, "source": [ "Now, let's say that we have a collection of `numpy` arrays containing image data for a group of subjects:" @@ -66,7 +66,7 @@ { "cell_type": "code", "execution_count": null, - "id": "8727c53a", + "id": "53839192", "metadata": {}, "outputs": [], "source": [ @@ -75,7 +75,7 @@ }, { "cell_type": "markdown", - "id": "211fb083", + "id": "733f66cc", "metadata": {}, "source": [ "And we want to calculate some metric on each image. We simply need to define a function which contains the calculation we want to perform (the `time.sleep` call is there just to simulate a complex calculation):" @@ -84,7 +84,7 @@ { "cell_type": "code", "execution_count": null, - "id": "bcd89418", + "id": "049cd8dd", "metadata": {}, "outputs": [], "source": [ @@ -95,7 +95,7 @@ }, { "cell_type": "markdown", - "id": "4a499b91", + "id": "1266287d", "metadata": {}, "source": [ "We can now use `joblib.Parallel` and `joblib.delayed` to parallelise those calculations. `joblib.Parallel` sets up a pool of worker processes, and `joblib.delayed` schedules a single instantiation of the function, and associated data, which is to be executed. We can execute a sequence of `delayed` tasks by passing them to the `Parallel` pool:" @@ -104,7 +104,7 @@ { "cell_type": "code", "execution_count": null, - "id": "f96c07ac", + "id": "931cb5d7", "metadata": {}, "outputs": [], "source": [ @@ -116,7 +116,7 @@ }, { "cell_type": "markdown", - "id": "f0b73045", + "id": "5b499444", "metadata": {}, "source": [ "Just like with `multiprocessing`, JobLib is susceptible to the same problems with regard to sharing data between processes (these problems are\n", @@ -134,7 +134,7 @@ { "cell_type": "code", "execution_count": null, - "id": "b2966ebe", + "id": "6ec1fb3a", "metadata": {}, "outputs": [], "source": [ @@ -146,7 +146,7 @@ }, { "cell_type": "markdown", - "id": "29e18767", + "id": "314c22e5", "metadata": {}, "source": [ "Now we will load our 4D data, and pre-allocate another array to store the fitted model parameters." @@ -155,7 +155,7 @@ { "cell_type": "code", "execution_count": null, - "id": "b7f9b544", + "id": "d9f90609", "metadata": {}, "outputs": [], "source": [ @@ -177,7 +177,7 @@ }, { "cell_type": "markdown", - "id": "851e0392", + "id": "47dc9a3b", "metadata": {}, "source": [ "<a class=\"anchor\" id=\"dask\"></a>\n", @@ -201,30 +201,37 @@ "## Dask Numpy API\n", "\n", "\n", + "https://docs.dask.org/en/stable/array.html\n", + "\n", + "\n", + "> Dask also provides a Pandas-style API, accessible in the `dask.dataframe` package - you can read about it at https://docs.dask.org/en/stable/dataframe.html.\n", + "\n", + "\n", "To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package:" ] }, { "cell_type": "code", "execution_count": null, - "id": "83f8a8df", + "id": "7923a7be", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import dask.array as da\n", "\n", - "data = da.random.random((1000, 1000, 1000, 20)).astype(float32)" + "data = da.random.random((1000, 1000, 1000, 20)).astype(np.float32)\n", + "data" ] }, { "cell_type": "markdown", - "id": "ae58302e", + "id": "2bdd29e6", "metadata": {}, "source": [ "If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers.\n", "\n", - "> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and would actually temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default).\n", + "> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and in fact would temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default).\n", "\n", "\n", "However:\n", @@ -239,7 +246,7 @@ { "cell_type": "code", "execution_count": null, - "id": "67fc4ec6", + "id": "b2605c92", "metadata": {}, "outputs": [], "source": [ @@ -249,7 +256,7 @@ }, { "cell_type": "markdown", - "id": "9a2d89c1", + "id": "08bf1c77", "metadata": {}, "source": [ "But again, this will not actually calculate the mean - it just defines a task which, when executed, will calculate the mean over the `data` array.\n", @@ -260,7 +267,7 @@ { "cell_type": "code", "execution_count": null, - "id": "0f60b5b3", + "id": "cecf0b61", "metadata": {}, "outputs": [], "source": [ @@ -269,23 +276,390 @@ }, { "cell_type": "markdown", - "id": "5e51a09e", + "id": "1472b53d", "metadata": {}, "source": [ + "Dask arrays support most of the functionality of Numpy arrays, but there are a few exceptions, most notably the `numpy.linalg` package.\n", + "\n", + "\n", "<a class=\"anchor\" id=\"low-level-dask-api\"></a>\n", "## Low-level Dask API\n", "\n", "\n", + "In addition to the Numpy and Pandas APIs, Dask also has a lower-level interface which allows you to create and lazily execute a pipeline made up of Python functions. This API is based around `dask.delayed`, which is a Python decorator that can be applied to any function, and which tells Dask that a function should be lazily executed.\n", + "\n", + "As a very simple example (taken from the [Dask documentation](https://docs.dask.org/en/stable/delayed.html#example)), consider this simple numerical task, where we have a set of numbers and, for each number `x`, we want to calculate `(x * x)`, and then add all of the results together (i.e. the sum of squares). We'll start by defining a function for each of the operations that we need to perform:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f764830", + "metadata": {}, + "outputs": [], + "source": [ + "def square(x):\n", + " return x * x\n", + "\n", + "def sum(values):\n", + " total = 0\n", + " for v in values:\n", + " total = total + v\n", + " return total" + ] + }, + { + "cell_type": "markdown", + "id": "fd5252f6", + "metadata": {}, + "source": [ + "We could solve this problem without parallelism by using conventional Python code, which might look something like the following:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b0910aaf", + "metadata": {}, + "outputs": [], + "source": [ + "data = [1, 2, 3, 4, 5]\n", + "output = []\n", + "\n", + "for x in data:\n", + " s = square(x)\n", + " output.append(s)\n", "\n", + "total = sum(output)\n", + "print(total)" + ] + }, + { + "cell_type": "markdown", + "id": "8a7a45f5", + "metadata": {}, + "source": [ + "However, this problem is inherently parallelisable - we are independently performing the same series of steps to each of our inputs, before the final tallying. We can use the `dask.delayed` function to take advantage of this:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9031f120", + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "\n", + "output = []\n", + "for x in data:\n", + " a = dask.delayed(square)(x)\n", + " output.append(a)\n", + "\n", + "total = dask.delayed(sum)(output)" + ] + }, + { + "cell_type": "markdown", + "id": "6bc3ef4c", + "metadata": {}, + "source": [ + "We have not actually performed any computations yet. What we have done is built a _graph_ of operations that we need to perform. Dask refers to this as a **task graph** - Dask keeps track of the dependencies of each function that we add, and so is able to determine the order in they need to be executed, and also which steps do not depend on each other and so can be executed in parallel. Dask can even generate an image of our task graph for us:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f0833d9", + "metadata": {}, + "outputs": [], + "source": [ + "total.visualize()" + ] + }, + { + "cell_type": "markdown", + "id": "0164c407", + "metadata": {}, + "source": [ + "And then when we are ready, we can call `compute()` to actually do the calculation:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8561997b", + "metadata": {}, + "outputs": [], + "source": [ + "total.compute()" + ] + }, + { + "cell_type": "markdown", + "id": "3590a9f9", + "metadata": {}, + "source": [ + "For a more realistic example, let's imagine that we have T1 MRI images for five subjects, and we want to perform basic structural preprocessing on each of them (reorientation, FOV reduction, and brain extraction). He're creating this example data set (all of the T1 images are just a copy of the `bighead.nii.gz` image, from the FSL course data)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3879db5", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import shutil\n", + "\n", + "os.makedirs('braindata', exist_ok=True)\n", + "for i in range(1, 6):\n", + " shutil.copy('../../applications/fslpy/bighead.nii.gz', f'braindata/{i:02d}.nii.gz')" + ] + }, + { + "cell_type": "markdown", + "id": "77851c91", + "metadata": {}, + "source": [ + "And now we can build our pipeline. The [fslpy](https://open.win.ox.ac.uk/pages/fsl/fslpy/) library has a collection of functions which we can use to call the FSL commands that we need. We need to do a little work, as by default `dask.delayed` assumes that the dependencies of a task are passed in as arguments (there are other ways of dealing with this, but in this example it is easy to simply write a few small functions that define each of our tasks):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f9d5f36", + "metadata": {}, + "outputs": [], + "source": [ + "import fsl.wrappers as fw\n", + "\n", + "def reorient(input, output):\n", + " fw.fslreorient2std(input, output)\n", + " return output\n", + "\n", + "def fov(input, output):\n", + " fw.robustfov(input, output)\n", + " return output\n", + "\n", + "def bet(input, output):\n", + " fw.bet(input, output)\n", + " return output" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4a762719", + "metadata": {}, + "outputs": [], + "source": [ + "import glob\n", + "import dask\n", + "import fsl.data.image as fslimage\n", + "\n", + "inputs = list(glob.glob('braindata/??.nii.gz'))\n", + "tasks = []\n", + "\n", + "for input in inputs:\n", + " basename = fslimage.removeExt(input)\n", + " r = dask.delayed(reorient)(input, f'{basename}_reorient.nii.gz')\n", + " f = dask.delayed(fov)(r, f'{basename}_fov.nii.gz')\n", + " b = dask.delayed(bet)(f, f'{basename}_brain.nii.gz')\n", + " tasks.append(b)" + ] + }, + { + "cell_type": "markdown", + "id": "1ed04761", + "metadata": {}, + "source": [ + "In the previous example we had a single output (the result of summing the squared input values) which we called `visualize()` on. We can also call `dask.visualize()` to visualise a collection of tasks:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f62c7c6d", + "metadata": {}, + "outputs": [], + "source": [ + "dask.visualize(*tasks)" + ] + }, + { + "cell_type": "markdown", + "id": "63017c42", + "metadata": {}, + "source": [ + "And, similarly, we can call `dask.compute` to run them all:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b6e40a9", + "metadata": {}, + "outputs": [], + "source": [ + "outputs = dask.compute(*tasks)\n", + "print(outputs)" + ] + }, + { + "cell_type": "markdown", + "id": "5dbb82f6", + "metadata": {}, + "source": [ "<a class=\"anchor\" id=\"distributing-computation-with-dask\"></a>\n", "## Distributing computation with Dask\n", "\n", "\n", + "In the examples above, Dask was running its tasks in parallel on your local machine. But it is easy to instruct Dask to distribute its computations across multiple machines. For example, Dask has support for executing tasks on a SGE or SLURM cluster, which we have available in Oxford at FMRIB and the BDI.\n", + "\n", + "To use this functionality, we need an additional library called `dask-jobqueue` which, at the moment, is not installed as part of FSL.\n", + "\n", + "> Note that the code cells below will not work if you are running this notebook on your own computer (unless you happen to have a SGE cluster system installed on your laptop).\n", + "\n", + "\n", + "The first step is to create a `SGECluster` (or `SLURMCluster`) object. You need to populate this object with information about a _single job_ in your workflow. At a minumum, you must specify the number of cores and memory that are available to a single job:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6e7814e", + "metadata": {}, + "outputs": [], + "source": [ + "from dask_jobqueue import SGECluster\n", + "cluster = SGECluster(cores=2, memory='16GB')" + ] + }, + { + "cell_type": "markdown", + "id": "10dd646c", + "metadata": {}, + "source": [ + "You can also set a range of other options, including specifying a queue name (e.g. `queue='short.q'`), or specifying the total amount of time that your job will run for (e.g. `walltime='01:00:00'` for one hour).\n", + "\n", + "\n", + "The next step is to create some jobs by calling the `scale()` method:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bfa74705", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.scale(jobs=5)" + ] + }, + { + "cell_type": "markdown", + "id": "7123a281", + "metadata": {}, + "source": [ + "Behind the scenes, the `scale()` method uses `qsub` to create five \"worker processes\", each running on a different cluster node. In the first instance, these worker processes will be sitting idle, doing nothing, and waiting for you to give them a task to run.\n", + "\n", + "The final step is to create \"client\" object. This is an important step which configures Dask to use the cluster we have just set up:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0c8502aa", + "metadata": {}, + "outputs": [], + "source": [ + "client = cluster.get_client()" + ] + }, + { + "cell_type": "markdown", + "id": "9b43cee7", + "metadata": {}, + "source": [ + "After creating a client, any Dask computations that we request will be performed on the cluster by our worker processes:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c06367ee", + "metadata": {}, + "outputs": [], + "source": [ + "import dask.array as da\n", + "data = da.random.random((1000, 1000, 1000, 10))\n", + "print(data.mean().compute())" + ] + }, + { + "cell_type": "markdown", + "id": "f5c78ed3", + "metadata": {}, + "source": [ + "<a class=\"anchor\" id=\"fsl-pipe\"></a>\n", + "## fsl-pipe\n", + "\n", + "\n", + "https://open.win.ox.ac.uk/pages/fsl/fsl-pipe/\n", + "\n", + "\n", + "fsl-pipe is a Python library (written by our very own Michiel Cottaar) which builds upon another library called [file-tree](https://open.win.ox.ac.uk/pages/fsl/file-tree/), and allows you to write an analysis pipeline in a _declarative_ manner. A pipeline is defined by:\n", + "\n", + " - A \"file tree\" which defines the directory structure of the input and output files of the pipeline.\n", + " - A set of recipes (Python functions) describing how each of the pipeline outputs should be produced.\n", + "\n", + "In a similar vein to Dask task graphs, fsl-pipe will automatically determine which recipes should be executed, and in what order, to produce whichever output file(s) you request. Recipes can either run locally, be distributed using Dask, or be executed on a cluster using `fsl_sub`.\n", + "\n", + "\n", + "For example, let's again imagine that we have some T1 images upon which we wish to perform basic structural preprocessing, and which arranged in the following manner:\n", + "\n", + "> ```\n", + "> subjectA/\n", + "> T1w.nii.gz\n", + "> subjectB/\n", + "> T1w.nii.gz\n", + "> subjectC/\n", + "> T1w.nii.gz\n", + "> ```\n", + "\n", + "We must first describe the structure of our data, and save that description as a file-tree file (e.g. `mydata.tree`). This file contains a placeholder for the subject ID, and gives both the input and any desired output files unique identifiers:\n", + "\n", + "> ```\n", + "> subject{subject}\n", + "> T1w.nii.gz (t1)\n", + "> T1w_brain.nii.gz (t1_brain)\n", + "> T1w_fov.nii.gz (t1_fov)\n", + "> T1w_reorient.nii.gz (t1_reorient)\n", + "> ```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "07ff84b0", + "metadata": {}, + "outputs": [], + "source": [ + "#!/usr/bin/env python\n", + "\n", + "from fsl.wrappers import bet\n", + "\n", + "from file_tree import FileTree\n", + "from fsl_pipe import pipe, In, Out\n", "\n", + "@pipe\n", + "def brain_extract(t1 : In, bet_output : Out, bet_mask : Out):\n", + " bet(t1, bet_output, mask=True)\n", "\n", - "fsl-pipe\n", - "fsl_sub\n", - "ray" + "tree = FileTree.read('data.tree').update_glob('t1')\n", + "pipe.cli(tree)" ] } ], diff --git a/applications/parallel/parallel.md b/applications/parallel/parallel.md index b22f287126863b1981307f6f9de81faecf9dfcae..005c7aa569df5dc5d4167c510b59f9708f71ad73 100644 --- a/applications/parallel/parallel.md +++ b/applications/parallel/parallel.md @@ -126,18 +126,25 @@ We will introduce the Numpy API and will briefly cover the low-level API, and th ## Dask Numpy API +https://docs.dask.org/en/stable/array.html + + +> Dask also provides a Pandas-style API, accessible in the `dask.dataframe` package - you can read about it at https://docs.dask.org/en/stable/dataframe.html. + + To use the Dask Numpy API, simply import the `dask.array` package, and use it instead of the `numpy` package: ``` import numpy as np import dask.array as da -data = da.random.random((1000, 1000, 1000, 20)).astype(float32) +data = da.random.random((1000, 1000, 1000, 20)).astype(np.float32) +data ``` If you do the numbers, you will realise that the above call has created an array which requires **74 gigabytes** of memory - this is far more memory than what is available in most consumer level computers. -> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and would actually temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default). +> The call would almost certainly fail if you made it using `np.random.random` instead of `da.random.random`, as Numpy will attempt to create the entire data set (and in fact would temporarily require up to **150 gigabytes** of memory, as Numpy uses double-precision floating point (`float64`) values by default). However: @@ -163,17 +170,223 @@ print(m.compute()) ``` +Dask arrays support most of the functionality of Numpy arrays, but there are a few exceptions, most notably the `numpy.linalg` package. + + <a class="anchor" id="low-level-dask-api"></a> ## Low-level Dask API +In addition to the Numpy and Pandas APIs, Dask also has a lower-level interface which allows you to create and lazily execute a pipeline made up of Python functions. This API is based around `dask.delayed`, which is a Python decorator that can be applied to any function, and which tells Dask that a function should be lazily executed. + +As a very simple example (taken from the [Dask documentation](https://docs.dask.org/en/stable/delayed.html#example)), consider this simple numerical task, where we have a set of numbers and, for each number `x`, we want to calculate `(x * x)`, and then add all of the results together (i.e. the sum of squares). We'll start by defining a function for each of the operations that we need to perform: + +``` +def square(x): + return x * x + +def sum(values): + total = 0 + for v in values: + total = total + v + return total +``` + +We could solve this problem without parallelism by using conventional Python code, which might look something like the following: + +``` +data = [1, 2, 3, 4, 5] +output = [] + +for x in data: + s = square(x) + output.append(s) + +total = sum(output) +print(total) +``` + +However, this problem is inherently parallelisable - we are independently performing the same series of steps to each of our inputs, before the final tallying. We can use the `dask.delayed` function to take advantage of this: + + +``` +import dask + +output = [] +for x in data: + a = dask.delayed(square)(x) + output.append(a) + +total = dask.delayed(sum)(output) +``` + +We have not actually performed any computations yet. What we have done is built a _graph_ of operations that we need to perform. Dask refers to this as a **task graph** - Dask keeps track of the dependencies of each function that we add, and so is able to determine the order in they need to be executed, and also which steps do not depend on each other and so can be executed in parallel. Dask can even generate an image of our task graph for us: + +``` +total.visualize() +``` + +And then when we are ready, we can call `compute()` to actually do the calculation: + +``` +total.compute() +``` + + +For a more realistic example, let's imagine that we have T1 MRI images for five subjects, and we want to perform basic structural preprocessing on each of them (reorientation, FOV reduction, and brain extraction). He're creating this example data set (all of the T1 images are just a copy of the `bighead.nii.gz` image, from the FSL course data). + + +``` +import os +import shutil + +os.makedirs('braindata', exist_ok=True) +for i in range(1, 6): + shutil.copy('../../applications/fslpy/bighead.nii.gz', f'braindata/{i:02d}.nii.gz') +``` + +And now we can build our pipeline. The [fslpy](https://open.win.ox.ac.uk/pages/fsl/fslpy/) library has a collection of functions which we can use to call the FSL commands that we need. We need to do a little work, as by default `dask.delayed` assumes that the dependencies of a task are passed in as arguments (there are other ways of dealing with this, but in this example it is easy to simply write a few small functions that define each of our tasks): + + +``` +import fsl.wrappers as fw + +def reorient(input, output): + fw.fslreorient2std(input, output) + return output + +def fov(input, output): + fw.robustfov(input, output) + return output + +def bet(input, output): + fw.bet(input, output) + return output +``` + + +``` +import glob +import dask +import fsl.data.image as fslimage + +inputs = list(glob.glob('braindata/??.nii.gz')) +tasks = [] + +for input in inputs: + basename = fslimage.removeExt(input) + r = dask.delayed(reorient)(input, f'{basename}_reorient.nii.gz') + f = dask.delayed(fov)(r, f'{basename}_fov.nii.gz') + b = dask.delayed(bet)(f, f'{basename}_brain.nii.gz') + tasks.append(b) +``` + +In the previous example we had a single output (the result of summing the squared input values) which we called `visualize()` on. We can also call `dask.visualize()` to visualise a collection of tasks: + +``` +dask.visualize(*tasks) +``` + +And, similarly, we can call `dask.compute` to run them all: + +``` +outputs = dask.compute(*tasks) +print(outputs) +``` + <a class="anchor" id="distributing-computation-with-dask"></a> ## Distributing computation with Dask +In the examples above, Dask was running its tasks in parallel on your local machine. But it is easy to instruct Dask to distribute its computations across multiple machines. For example, Dask has support for executing tasks on a SGE or SLURM cluster, which we have available in Oxford at FMRIB and the BDI. + +To use this functionality, we need an additional library called `dask-jobqueue` which, at the moment, is not installed as part of FSL. + +> Note that the code cells below will not work if you are running this notebook on your own computer (unless you happen to have a SGE cluster system installed on your laptop). + + +The first step is to create a `SGECluster` (or `SLURMCluster`) object. You need to populate this object with information about a _single job_ in your workflow. At a minumum, you must specify the number of cores and memory that are available to a single job: + + +``` +from dask_jobqueue import SGECluster +cluster = SGECluster(cores=2, memory='16GB') +``` + +You can also set a range of other options, including specifying a queue name (e.g. `queue='short.q'`), or specifying the total amount of time that your job will run for (e.g. `walltime='01:00:00'` for one hour). + + +The next step is to create some jobs by calling the `scale()` method: + +``` +cluster.scale(jobs=5) +``` + +Behind the scenes, the `scale()` method uses `qsub` to create five "worker processes", each running on a different cluster node. In the first instance, these worker processes will be sitting idle, doing nothing, and waiting for you to give them a task to run. + +The final step is to create "client" object. This is an important step which configures Dask to use the cluster we have just set up: +``` +client = cluster.get_client() +``` + +After creating a client, any Dask computations that we request will be performed on the cluster by our worker processes: + +``` +import dask.array as da +data = da.random.random((1000, 1000, 1000, 10)) +print(data.mean().compute()) +``` + +<a class="anchor" id="fsl-pipe"></a> +## fsl-pipe + + +https://open.win.ox.ac.uk/pages/fsl/fsl-pipe/ + + +fsl-pipe is a Python library (written by our very own Michiel Cottaar) which builds upon another library called [file-tree](https://open.win.ox.ac.uk/pages/fsl/file-tree/), and allows you to write an analysis pipeline in a _declarative_ manner. A pipeline is defined by: + + - A "file tree" which defines the directory structure of the input and output files of the pipeline. + - A set of recipes (Python functions) describing how each of the pipeline outputs should be produced. -fsl-pipe -fsl_sub -ray +In a similar vein to Dask task graphs, fsl-pipe will automatically determine which recipes should be executed, and in what order, to produce whichever output file(s) you request. Recipes can either run locally, be distributed using Dask, or be executed on a cluster using `fsl_sub`. + + +For example, let's again imagine that we have some T1 images upon which we wish to perform basic structural preprocessing, and which arranged in the following manner: + +> ``` +> subjectA/ +> T1w.nii.gz +> subjectB/ +> T1w.nii.gz +> subjectC/ +> T1w.nii.gz +> ``` + +We must first describe the structure of our data, and save that description as a file-tree file (e.g. `mydata.tree`). This file contains a placeholder for the subject ID, and gives both the input and any desired output files unique identifiers: + +> ``` +> subject{subject} +> T1w.nii.gz (t1) +> T1w_brain.nii.gz (t1_brain) +> T1w_fov.nii.gz (t1_fov) +> T1w_reorient.nii.gz (t1_reorient) +> ``` + +``` +#!/usr/bin/env python + +from fsl.wrappers import bet + +from file_tree import FileTree +from fsl_pipe import pipe, In, Out + +@pipe +def brain_extract(t1 : In, bet_output : Out, bet_mask : Out): + bet(t1, bet_output, mask=True) + +tree = FileTree.read('data.tree').update_glob('t1') +pipe.cli(tree) +```