Skip to content
Snippets Groups Projects
pipe.py 10.3 KiB
Newer Older
Michiel Cottaar's avatar
Michiel Cottaar committed
"""
Runs an example diffusion MRI pipeline created using fsl-pipe
"""

# First we import the `fsl_pipe.Pipeline` object, which will contain the actual pipeline
# The other imports (In/Out/Ref/Var) will be used to mark which files each recipe requires as input or output.
from fsl_pipe import Pipeline, In, Out, Ref, Var
# Next we require a FileTree describing the directory structure of the input and output files of the pipeline.
from file_tree import FileTree

# We use FSLpy wrappers to call the FSL tools required to run the pipeline
from fsl.wrappers import bet, fslreorient2std, fslroi, fslmerge, topup, eddy, dtifit
from fsl.utils.run import runfsl, run
Michiel Cottaar's avatar
Michiel Cottaar committed
import nibabel as nib
from shutil import copy

# Create an new, empty pipeline
pipe = Pipeline()


# Adds the first recipe to the pipeline using `@pipe`.
# Note that a recipe is simply a python function (`reorient_2_std` in this case).
# This job takes the "raw" files as input (as indicated by "raw: In") 
# and produced the "reoriented" files as output (as indicated by "reoriented: Out").
# You can check in the file-tree file ("data.tree") how these files are defined.
#
# Note that this single recipe will actually run 12 times (2 subjects x 2 phase encode directions x 3 scans).
# Each of these 12 jobs will run independently (and have its own dependency structure with other jobs).
# When on a cluster with "fsl_sub" installed, each of these jobs will be submitted with a runtime of 5 minutes.
@pipe(submit=dict(jobtime=5))
Michiel Cottaar's avatar
Michiel Cottaar committed
def reorient_2_std(raw: In, reoriented: Out):
    """Reorient raw data to standard orientations.
    
    WARNING: this step might create a discrepancy between the data and the bvecs. I would not recommend it in practice.
    """
    fslreorient2std(raw, reoriented)


# Adds a second recipe to the pipeline.
# This time a job taking at most 3 minutes ("jobtime=3") and will extract the B0 image.
# We are only interested in running this job for the first scan,
# which is indicated by overriding the default values for scan (namely: 95, 96, and 97) with just 95.
#
# Note that by using "reoriented" as input, we implicitly create a dependency with the previous recipe.
# When running the pipeline, the relevant jobs for the recipe `reorient_2_std` are guaranteed to run before these jobs.
@pipe(submit=dict(jobtime=3), placeholders=dict(scan="95"))
def extract_b0(reoriented: In, extracted_b0: Out):
    """Extract b0 volume from diffusion-weighted data.
    
    The first volume is assumed to be a b0 volume.
    """
    fslroi(reoriented, extracted_b0, 0, 1)


# In this recipy we see a novel "no_iter" parameter.
# This is used to indicate that we do not want to run this recipe separately for each value of "pe_dir".
# Instead we want to run this recipe once with all possible values of "pe_dir" passed on.
# For more information on "no_iter" see 
# https://open.win.ox.ac.uk/pages/ndcn0236/fsl-pipe/tutorial.html#concatenating-merging-across-subjects-or-other-placeholders
@pipe(submit=dict(jobtime=3), no_iter=["pe_dir"])
def merge_b0s(extracted_b0: In, merged_b0s: Out):
    """Merge the B0s, so that we can run topup.
    """
    fslmerge("t", merged_b0s, *extracted_b0.data.flatten())

# At this point we have already created a complicated dependency structure.
# If we ask the pipeline to produce "merged_b0" for a specific subject, the following jobs will be run:
# reorient_2_std(pe_dir=LR, scan=95)    reorient_2_std(pe_dir=RL, scan=95)
#                  ↓                                    ↓
#          extract_b0(pe_dir=LR)                extract_b0(pe_dir=RL)
#                              ⬊               ⬋           
#                                 merge_b0s()
# The arrows indicate the dependency structure.
# Note that "reorient_2_std" will only be run twice, because in this case we do not need the results for "scan=96" or "scan=97".


# To run topup we also need an "acqparams" file.
# Note that this recipe has no dependencies in common with the recipes above, 
# so "fsl-pipe" can run it in parallel.
#
# To produce the correct content for the "acqparams" file, 
# we need not just the filenames, but the actual values of "pe_dir".
# We indicate that we want these values using "pe_dir: Var()".
# Once again, we want a single run for both values of "pe_dir", 
# which we indicate by setting "no_iter" (in a different way than above).
@pipe(submit=dict(jobtime=1))
def gen_acqparams(acqparams: Out, pe_dir: Var(no_iter=True)):
    """Generates the topup acqparams file with a line for each pe_dir."""
    mapping = {
        "LR": "1 0 0 0.05",
        "RL": "-1 0 0 0.05",
        "AP": "0 1 0 0.05",
        "PA": "0 -1 0 0.05",
    }
    with open(acqparams, 'w') as f:
        for value in pe_dir.value:
            f.write(mapping[value])
            f.write("\n")

# Now, we finally have all the information to run topup.
# There are a three more tricks here regarding sub-trees:
# 1. In the file-tree file ("data.tree") we can see that the "topup" directory structure contains a weird line:
#        ->topup (fieldmap)
#    This line indicates that file-tree should insert here the templates defined in the topup file-tree
#    (https://git.fmrib.ox.ac.uk/ndcn0236/file-tree-fsl/-/blob/master/file_tree_fsl/trees/topup.tree).
#    These templates will be identifies by the prefix "fieldmap/".
#    Sub-trees are explained in detail at 
#    https://open.win.ox.ac.uk/pages/ndcn0236/file-tree/tutorial.html#sub-trees.
# 2. The basename is neither an input, nor an outpuf file, but we would still like to know its value.
#    To indicate you want to access a filename, but not make it part of the depenency structure, you can use "Ref".
# 3. Up untill this point the keyword arguments used in the python functions matched 
#    the template identifiers we wanted to use.
#    Here this is no longer the case, so we explicitly set the template identifier using In/Out/Ref(<template identifier>).
@pipe(submit=dict(jobtime=30))
def run_topup(
        merged_b0s: In, acqparams: In,
        topup_basename: Ref("fieldmap/basename"), 
        outfile: Out("fieldmap/fieldcoef"),
        unwarped_b0: Out,
        ):
    topup(merged_b0s, acqparams, out=topup_basename, iout=unwarped_b0, config="b02b0_1.cnf")
Michiel Cottaar's avatar
Michiel Cottaar committed


# Let's have some simple recipes again
@pipe(submit=dict(jobtime=3))
def run_bet(unwarped_b0: In, nodif_brain: Out, nodif_brain_mask: Out):
    """Extracts brain from the unwarped B0"""
    bet(unwarped_b0, nodif_brain, mask=True)


# Here the "reoriented" input is a 2x3 DataArray,
# because it depends on both "pe_dir" and "scan", which are both in "no_iter".
# There is an overview of what data types are passed into the recipes at
# https://open.win.ox.ac.uk/pages/ndcn0236/fsl-pipe/tutorial.html#what-is-passed-into-the-functions
@pipe(submit=dict(jobtime=3), no_iter=("pe_dir", "scan"))
def merge_data(reoriented: In, pe_dir: Var, eddy_input: Out, eddy_index: Out):
    """Merge all DWI data for eddy."""
    filenames = []
    ndata = []
    for value in pe_dir.value:
        row = reoriented.sel(dict(pe_dir=value)) # select the row corresponding to this phase encode direction
        filenames.extend(row.data)
        ndata.append(sum([nib.load(fn).shape[3] for fn in row.data])) # number of volumes with this phase encoding
    #fslmerge("t", eddy_input, *filenames)
    print(ndata)
Michiel Cottaar's avatar
Michiel Cottaar committed
    with open(eddy_index, "w") as f:
        for index, nvols in enumerate(ndata):
            f.write("\n".join([str(index + 1)] * nvols) + "\n")
Michiel Cottaar's avatar
Michiel Cottaar committed

# We add the following recipe twice.
# Once for the bvals and once for the bvecs.
@pipe(submit=dict(jobtime=3), kwargs=dict(in_files=In("raw_bvals"), out_file=Out("bvals")))
@pipe(submit=dict(jobtime=3), kwargs=dict(in_files=In("raw_bvecs"), out_file=Out("eddy_input_bvecs")))
def merge_text_files(in_files, out_file, pe_dir: Var(no_iter=True), scan: Var(no_iter=True)):
    """Merge bvals and bvecs."""
    filenames = []
    for value in pe_dir.value:
        row = in_files.sel(dict(pe_dir=value))
        filenames.extend(row.data)
    res = run("paste", *filenames)
    with open(out_file, "w") as f:
        f.write(res)
Michiel Cottaar's avatar
Michiel Cottaar committed

# Finally, we reached the stage to add the cuda recipe.
# We want to send this job to a cluster core with a GPU cuda core enable,
# which is indicated in fsl_sub using "coprocessor='cuda'".
# See https://git.fmrib.ox.ac.uk/fsl/fsl_sub#fsl_subsubmit for other available submit flags.
#
# Note that even though "fieldmap/fieldcoef" is not used in the call to eddy,
# we still need to include it here to create the correct dependency structure with topup.
@pipe(submit=dict(coprocessor="cuda", jobtime=60))
def run_eddy(
    eddy_input: In, eddy_index: In, 
    topup_basename: Ref("fieldmap/basename"),
    fieldcoef: In("fieldmap/fieldcoef"),
Michiel Cottaar's avatar
Michiel Cottaar committed
    nodif_brain_mask: In,
    acqparams: In,
    bvals: In,
    eddy_input_bvecs: In,
Michiel Cottaar's avatar
Michiel Cottaar committed
    basename: Ref("eddy/basename"),
    result: Out("eddy/image"),
    result_bvecs: Out("eddy/rotated_bvecs"),
):
    eddy(
        eddy_input, nodif_brain_mask, eddy_index, acqparams, 
        eddy_input_bvecs, bvals, basename, topup=topup_basename
Michiel Cottaar's avatar
Michiel Cottaar committed
    )

@pipe(submit=dict(jobtime=10))
def collect_data(
    eddy_input: In("eddy/image"), data: Out,
    raw_bvecs: In("eddy/rotated_bvecs"), bvecs: Out,
):
    """Collect all preprocessed data in single directory."""
    copy(eddy_input, data)
    copy(raw_bvecs, bvecs)


# We want to run this job separately for each value of "shell",
Michiel Cottaar's avatar
Michiel Cottaar committed
# so in this case we do not use "no_iter".
@pipe(submit=dict(jobtime=10))
def extract_shell(
    data: In, bvals: In, bvecs: In, shell: Var, shell_basename: Ref,
Michiel Cottaar's avatar
Michiel Cottaar committed
    data_shell: Out, bvals_shell: Out, bvecs_shell: Out,
):
    runfsl([
        "select_dwi_vols", data, bvals, shell_basename, shell.value,
Michiel Cottaar's avatar
Michiel Cottaar committed
        "-b", "0", "-obv", bvecs
    ])


# So far, we have added jobs at the point we defined the python functions using `@pipe`.
# However, we can also add already existing python functions as recipes.
@pipe(submit=dict(jobtime=10))
def run_dtifit(
    data_shell: In, bvals_shell: In, bvecs_shell: In, nodif_brain_mask: In,
    basename: Ref("dti/basename"), FA: Out("dti/FA"),
):
    dtifit(data_shell, basename, nodif_brain_mask, bvecs_shell, bvals_shell)


if __name__ == "__main__":
    # Actually load the file-tree
    # Note that we did not need to define the input or output directory structure to build the pipeline so far.
    tree = FileTree.read("data.tree")

    # Run the default pipeline command line interface
    # https://open.win.ox.ac.uk/pages/ndcn0236/fsl-pipe/tutorial.html#exploring-the-command-line-interface-cli
    pipe.cli(tree)