Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • paulmc/fslpy
  • ndcn0236/fslpy
  • seanf/fslpy
3 results
Show changes
Showing
with 1818 additions and 626 deletions
"""
Easy format to define input/output files in a python pipeline.
"""Easy format to define input/output files in a python pipeline.
.. warning::
File-tree is now an independent Python library, and will eventually be
removed from ``fslpy`` - visit the `file-tree` :ref:`API documentation
<https://open.win.ox.ac.uk/pages/fsl/file-tree/>`_ for more details.
The goal is to separate the definition of the input/output filenames from the actual code
by defining a directory tree (i.e., FileTree) in a separate file from the code.
......@@ -177,7 +181,7 @@ which amongst others refers to
Example pipeline
----------------
A very simple pipeline to run BET on every subject can start with a simply FileTree like
A very simple pipeline to run BET on every subject can start with a FileTree like
::
{subject}
......@@ -200,6 +204,12 @@ Assuming that the input T1w's already exist, we can then simply run BET for ever
# make_dir=True ensures that the output directory containing the "bet_output" actually exists
bet(input=T1w_tree.get('T1w'), output=T1w_tree.get('bet_output', make_dir=True), mask=True)
Useful tips
-----------
Changing directory structure
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If later on in our input files change, because for some subjects we added a second session, we could keep our script
and simply update the FileTree:
::
......@@ -207,8 +217,8 @@ and simply update the FileTree:
{subject}
[ses-{session}]
T1w.nii.gz
T1w_brain.nii.gz (bed_output)
T1w_brain_mask.nii.gz (bed_mask)
T1w_brain.nii.gz (bet_output)
T1w_brain_mask.nii.gz (bet_mask)
Note the square brackets around the session sub-directory. This indicates that this sub-directory is optional and
will only be present if the "session" variable is defined (see `Optional variables`_).
......@@ -230,17 +240,24 @@ altering this behaviour is again as simple as altering the FileTree to something
::
raw_data
{subject}
[ses-{session}]
{subject} (input_subject_dir)
[ses-{session}] (input_session_dir)
T1w.nii.gz
processed_data
{subject}
[ses-{session}]
{subject} (output_subject_dir)
[ses-{session}] (output_session_dir)
bet
{subject}[_{session}]_T1w_brain.nii.gz (bet_output)
{subject}[_{session}]_T1w_brain_mask.nii.gz (bet_mask)
Note that we also encoded the subject and session ID in the output filename.
We also have to explicitly assign short names to the subject and session directories,
even though we don't explicitly reference these in the script.
The reason for this is that each directory and filename template must have a unique short name and
in this case the default short names (respectively, "{subject}" and "[ses-{session}]") would not have been unique.
Output "basenames"
^^^^^^^^^^^^^^^^^^
Some tools like FSL's FAST produce many output files. Rather than entering all
of these files in our FileTree by hand you can include them all at once by including `Sub-trees`_:
......@@ -248,12 +265,12 @@ of these files in our FileTree by hand you can include them all at once by inclu
::
raw_data
{subject}
[ses-{session}]
{subject} (input_subject_dir)
[ses-{session}] (input_session_dir)
T1w.nii.gz
processed_data
{subject}
[ses-{session}]
{subject} (output_subject_dir)
[ses-{session}] (output_session_dir)
bet
{subject}[_{session}]_T1w_brain.nii.gz (bet_output)
{subject}[_{session}]_T1w_brain_mask.nii.gz (bet_mask)
......@@ -272,6 +289,35 @@ Within the script we can generate the fast output by running
The output files will be available as `T1w_tree.get('segment/<variable name>')`, where `<variable name>` is one
of the short variable names defined in the
`FAST FileTree <https://git.fmrib.ox.ac.uk/fsl/fslpy/blob/master/fsl/utils/filetree/trees/fast.tree>`_.
Running a pipeline on a subset of participants/sessions/runs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Suppose you want to run your pipeline on a subset of your data while testing.
You may want to do this if your data has a a hierarchy of variables (e.g. participant, session, run) as in the example below.
::
sub-001
ses-01
sub-001_ses-01_run-1.feat
sub-001_ses-01_run-2.feat
ses-02
sub-{participant}_ses-{session}_run-{run}.feat (feat_dir)
...
sub-002
sub-003
...
You can update the FileTree with one or more variables before calling `get_all_trees` as follows:
.. code-block:: python
for participant in ("001", "002"):
for t in tree.update(participant=participant, run="1").get_all_trees("feat_dir", glob_vars="all"):
my_pipeline(t)
This code will iterate over all sessions that have a run="1" for participants "001" and "002".
"""
__author__ = 'Michiel Cottaar <Michiel.Cottaar@ndcn.ox.ac.uk>'
......@@ -279,3 +325,13 @@ __author__ = 'Michiel Cottaar <Michiel.Cottaar@ndcn.ox.ac.uk>'
from .filetree import FileTree, register_tree, MissingVariable
from .parse import tree_directories, list_all_trees
from .query import FileTreeQuery
import fsl.utils.deprecated as deprecated
deprecated.warn('fsl.utils.filetree',
stacklevel=2,
vin='3.6.0',
rin='4.0.0',
msg='The filetree package is now released as a separate '
'Python library ("file-tree" on PyPi), and will be '
'removed in a future version of fslpy.')
from pathlib import Path, PurePath
from typing import Tuple, Optional, Dict, Any, Set
from copy import deepcopy
from . import parse
import pickle
import json
import os.path as op
from . import utils
from fsl.utils.deprecated import deprecated
class MissingVariable(KeyError):
......@@ -28,11 +27,11 @@ class FileTree(object):
- ``name``: descriptive name of the tree
"""
def __init__(self,
templates: Dict[str, str],
variables: Dict[str, Any],
sub_trees: Dict[str, "FileTree"] = None,
parent: Optional["FileTree"] = None,
name: str = None):
templates: Dict[str, str],
variables: Dict[str, Any],
sub_trees: Dict[str, "FileTree"] = None,
parent: Optional["FileTree"] = None,
name: str = None):
"""
Creates a new filename tree.
"""
......@@ -51,7 +50,6 @@ class FileTree(object):
"""
return self._parent
@property
def name(self, ):
"""
......@@ -59,7 +57,6 @@ class FileTree(object):
"""
return self._name
@property
def all_variables(self, ):
"""
......@@ -181,10 +178,10 @@ class FileTree(object):
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk.
Any defined variables in `glob_vars` will be ignored.
If glob_vars is set to 'all', all undefined variables will be used to look up matches.
:return: sorted sequence of paths
:return: sequence of paths
"""
text, variables = self.get_template(short_name)
return tuple(str(Path(fn)) for fn in utils.get_all(text, variables, glob_vars=glob_vars))
return tuple([self.update(**vars).get(short_name)
for vars in self.get_all_vars(short_name, glob_vars=glob_vars)])
def get_all_vars(self, short_name: str, glob_vars=()) -> Tuple[Dict[str, str]]:
"""
......@@ -196,7 +193,8 @@ class FileTree(object):
If glob_vars is set to 'all', all undefined variables will be used to look up matches.
:return: sequence of dictionaries with the variables settings used to generate each filename
"""
return tuple(self.extract_variables(short_name, fn) for fn in self.get_all(short_name, glob_vars=glob_vars))
text, variables = self.get_template(short_name)
return utils.get_all(text, variables, glob_vars=glob_vars)
def get_all_trees(self, short_name: str, glob_vars=(), set_parent=True) -> Tuple["FileTree"]:
"""
......@@ -225,7 +223,7 @@ class FileTree(object):
Setting a variable to None will cause the variable to be unset
:return: New FileTree with same templates for directory names and filenames, but updated variables
"""
new_tree = deepcopy(self)
new_tree = self.copy()
set_tree = new_tree
while set_parent and set_tree.parent is not None:
set_tree = set_tree.parent
......@@ -256,12 +254,28 @@ class FileTree(object):
with open(filename, 'wb') as f:
pickle.dump(self, f)
def save_json(self, filename):
"""
Saves the Filetree to a JSON file
:param filename: filename to store the file tree in
"""
def default(obj):
if isinstance(obj, FileTree):
res = dict(obj.__dict__)
del res['_parent']
return res
return obj
with open(filename, 'w') as f:
json.dump(self, f, default=default, indent=2)
@classmethod
def load_pickle(cls, filename):
"""
Loads the Filetree from a pickle file
:param filename: filename produced from Filetree.save
:param filename: filename produced from Filetree.save_pickle
:return: stored Filetree
"""
with open(filename, 'rb') as f:
......@@ -270,6 +284,29 @@ class FileTree(object):
raise IOError("Pickle file did not contain %s object" % cls)
return res
@classmethod
def load_json(cls, filename):
"""
Loads the FileTree from a JSON file
:param filename: filename produced by FileTree.save_json
:return: stored FileTree
"""
def from_dict(input_dict):
res_tree = FileTree(
templates=input_dict['templates'],
variables=input_dict['variables'],
sub_trees={name: from_dict(value) for name, value in input_dict['sub_trees'].items()},
name=input_dict['_name'],
)
for sub_tree in res_tree.sub_trees.values():
sub_tree._parent = res_tree
return res_tree
with open(filename, 'r') as f:
as_dict = json.load(f)
return from_dict(as_dict)
def defines(self, short_names, error=False):
"""
Checks whether templates are defined for all the `short_names`
......@@ -327,8 +364,68 @@ class FileTree(object):
return False
return True
def partial_fill(self, ) -> "FileTree":
"""
Fills in known variables into the templates
:return: The resulting tree will have empty `variables` dictionaries and updated templates
"""
new_tree = self.copy()
to_update = new_tree
while to_update.parent is not None:
to_update = to_update.parent
to_update._update_partial_fill()
return new_tree
def _update_partial_fill(self, ):
"""
Helper function for `partial_fill` that updates the templates in place
"""
new_templates = {}
for short_name in self.templates:
template, variables = self.get_template(short_name)
new_templates[short_name] = str(utils.Template.parse(template).fill_known(variables))
self.templates = new_templates
for tree in self.sub_trees.values():
tree._update_partial_fill()
self.variables = {}
def copy(self, ):
"""
Copies the FileTree
Copies the templates, variables, sub_trees, and parent
:return: a copy of the FileTree
"""
return self._copy()
def _copy(self, new_parent=None, new_sub_tree=None):
"""
Helper function for copying a FileTree
"""
if new_sub_tree is None:
new_sub_tree = (None, None)
new_copy = type(self)(
templates=self.templates.copy(),
variables=self.variables.copy(),
name=self.name,
parent=new_parent
)
new_copy.sub_trees = {name: new_sub_tree[1] if new_sub_tree[0] == name else tree._copy(new_parent=new_copy)
for name, tree in self.sub_trees.items()}
if self.parent is not None and new_parent is None:
for my_key, ref_tree in self.parent.sub_trees.items():
if self is ref_tree:
break
else:
raise ValueError(f"Sub-tree {self} not found in parent tree")
new_copy._parent = self.parent._copy(new_sub_tree=(my_key, new_copy))
return new_copy
@classmethod
def read(cls, tree_name: str, directory='.', **variables) -> "FileTree":
def read(cls, tree_name: str, directory='.', partial_fill=False, **variables) -> "FileTree":
"""
Reads a FileTree from a specific file
......@@ -338,6 +435,7 @@ class FileTree(object):
:param tree_name: file containing the filename tree.
Can provide the filename of a tree file or the name for a tree in the ``filetree.tree_directories``.
:param directory: parent directory of the full tree (defaults to current directory)
:param partial_fill: By default any known `variables` are filled into the `template` immediately
:param variables: variable settings
:return: dictionary from specifier to filename
"""
......@@ -421,6 +519,8 @@ class FileTree(object):
res = get_registered(tree_name, cls)(templates, variables=file_variables, sub_trees=sub_trees, name=tree_name)
for tree in sub_trees.values():
tree._parent = res
if partial_fill:
res = res.partial_fill()
return res
......
......@@ -31,8 +31,7 @@ from typing import Dict, List, Tuple
import numpy as np
from fsl.utils.deprecated import deprecated
from . import FileTree
from . import FileTree
log = logging.getLogger(__name__)
......@@ -89,6 +88,8 @@ class FileTreeQuery(object):
:arg tree: The :class:`.FileTree` object
"""
# Hard-code into the templates any pre-defined variables
tree = tree.partial_fill()
# Find all files present in the directory
# (as Match objects), and find all variables,
......@@ -130,7 +131,7 @@ class FileTreeQuery(object):
# An ND array for this short
# name. Each element is a
# Match object, or nan.
matcharray = np.zeros(tvarlens, dtype=np.object)
matcharray = np.zeros(tvarlens, dtype=object)
matcharray[:] = np.nan
# indices into the match array
......@@ -205,15 +206,6 @@ class FileTreeQuery(object):
return list(self.__templatevars.keys())
@property
@deprecated('2.6.0', '3.0.0', 'Use templates instead')
def short_names(self) -> List[str]:
"""Returns a list containing all templates of the ``FileTree`` that
are present in the directory.
"""
return self.templates
def query(self, template, asarray=False, **variables):
"""Search for files of the given ``template``, which match
the specified ``variables``. All hits are returned for variables
......@@ -290,12 +282,6 @@ class Match(object):
return self.__filename
@property
@deprecated('2.6.0', '3.0.0', 'Use template instead')
def short_name(self):
return self.template
@property
def template(self):
return self.__template
......@@ -369,13 +355,13 @@ def scan(tree : FileTree) -> List[Match]:
matches = []
for template in tree.templates:
for filename in tree.get_all(template, glob_vars='all'):
for variables in tree.get_all_vars(template, glob_vars='all'):
filename = tree.update(**variables).get(template)
if not op.isfile(filename):
continue
variables = dict(tree.extract_variables(template, filename))
matches.append(Match(filename, template, tree, variables))
for tree_name, sub_tree in tree.sub_trees.items():
......
......@@ -2,19 +2,22 @@ ext=.nii.gz
dataset_description.json
participants.tsv
README
CHANGES
README (readme)
CHANGES (changes)
LICENSE (license)
genetic_info.json
sub-{participant}
[ses-{session}]
sub-{participant}_sessions.tsv (sessions_tsv)
anat (anat_dir)
sub-{participant}[_ses-{session}][_acq-{acq}][_rec-{rec}][_run-{run_index}]_{modality}{ext} (anat_image)
sub-{participant}[_ses-{session}][_acq-{acq}][_ce-{ce}][_rec-{rec}][_run-{run_index}]_{modality}{ext} (anat_image)
sub-{participant}[_ses-{session}][_acq-{acq}][_ce-{ce}][_rec-{rec}][_run-{run_index}][_mod-{modality}]_defacemask{ext} (anat_deface)
func (func_dir)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}]_bold{ext} (task_image)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}]_sbref{ext} (task_sbref)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}]_events.tsv (task_events)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}][_recording-{recording}]_physio{ext} (task_physio)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}][_recording-{recording}]_stim{ext} (task_stim)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}]_bold{ext} (task_image)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}]_sbref{ext} (task_sbref)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}]_events.tsv (task_events)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}][_recording-{recording}]_physio.tsv.gz (task_physio)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}][_recording-{recording}]_stim.tsv.gz (task_stim)
dwi (dwi_dir)
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_dwi{ext} (dwi_image)
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_dwi.bval (bval)
......@@ -28,3 +31,15 @@ sub-{participant}
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_phase2{ext} (fmap_phase2)
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_fieldmap{ext} (fmap)
sub-{participant}[_ses-{session}][_acq-{acq}]_dir-{dir}[_run-{run_index}]_epi{ext} (fmap_epi)
meg (meg_dir)
sub-{participant}[_ses-{session}]_task-{task}[_run-{run}][_proc-{proc}]_meg.{meg_ext} (meg)
eeg (eeg_dir)
sub-{participant}[_ses-{session}]_task-{task}[_run-{run}][_proc-{proc}]_eeg.{eeg_ext} (eeg)
ieeg (ieeg_dir)
sub-{participant}[_ses-{session}]_task-{task}[_run-{run}][_proc-{proc}]_ieeg.{ieeg_ext} (ieeg)
beh (behavioral_dir)
sub-{participant}[_ses-{session}]_task-{task}_events.tsv (behavioural_events)
sub-{participant}[_ses-{session}]_task-{task}_beh.tsv (behavioural)
sub-{participant}[_ses-{session}]_task-{task}_physio.tsv.gz (behavioural_physio)
sub-{participant}[_ses-{session}]_task-{task}_stim.tsv.gz (behavioral_stim)
......@@ -12,3 +12,10 @@ basename = dti
{basename}_L3.nii.gz (L3)
{basename}_kurt.nii.gz (kurt)
{basename}_kurt1.nii.gz (kurt1)
{basename}_kurt2.nii.gz (kurt2)
{basename}_kurt3.nii.gz (kurt3)
{basename}_sse.nii.gz (sse)
{basename}_cnicope.nii.gz (cnicope)
{basename}_tensor.nii.gz (tensor)
struct = T1
basename = fsl_anat
{basename} (basename)
{basename}.anat (fsl_anat_dir)
lesionmaskinv.nii.gz
lesionmask.nii.gz
log.txt
MNI152_{struct}_2mm_brain_mask_dil1.nii.gz
MNI_to_{struct}_nonlin_field.nii.gz
{struct}2std_skullcon.mat
{struct}_biascorr_bet_skull.nii.gz (biascorr_bet_skull)
{struct}_biascorr_brain_mask.nii.gz (biascorr_brain_mask)
{struct}_biascorr_brain.nii.gz (biascorr_brain)
{struct}_biascorr.nii.gz (biascorr)
{struct}_fast_bias_idxmask.nii.gz
{struct}_fast_bias_init.nii.gz
{struct}_fast_bias.nii.gz
{struct}_fast_bias_vol2.nii.gz
{struct}_fast_bias_vol32.nii.gz
{struct}_fast_restore.nii.gz
{struct}_fast_seg.nii.gz (fast_seg)
{struct}_fast_pve_0.nii.gz
{struct}_fast_pve_1.nii.gz
{struct}_fast_pve_2.nii.gz
{struct}_fast_pveseg.nii.gz (fast_pveseg)
{struct}_fast_totbias.nii.gz
{struct}_fullfov.nii.gz
{struct}_initfast2_brain_mask2.nii.gz
{struct}_initfast2_brain_mask.nii.gz
{struct}_initfast2_brain.nii.gz
{struct}_initfast2_maskedrestore.nii.gz
{struct}_initfast2_restore.nii.gz
{struct}.nii.gz
{struct}_nonroi2roi.mat
{struct}_orig2roi.mat
{struct}_orig2std.mat
{struct}_orig.nii.gz
{struct}_roi2nonroi.mat
{struct}_roi2orig.mat
{struct}_roi.log
{struct}_std2orig.mat
{struct}_to_MNI_lin.mat (MNI_lin_mat)
{struct}_to_MNI_lin.nii.gz (MNI_lin_nii)
{struct}_to_MNI_nonlin_coeff.nii.gz
{struct}_to_MNI_nonlin_field.nii.gz
{struct}_to_MNI_nonlin_jac.nii.gz
{struct}_to_MNI_nonlin.nii.gz (MNI_nonlin_nii)
{struct}_to_MNI_nonlin.txt (MNI_nonlin_txt)
import re
import itertools
import glob
from . import filetree
from typing import List, Sequence, Set, Tuple, Dict, Iterator
def resolve(template, variables):
class Part:
"""
Resolves the template given a set of variables
Individual part of a template
:param template: template
:param variables: mapping of variable names to values
:return: cleaned string
3 subclasses are defined:
- :class:`Literal`: piece of text
- :class:`Required`: required variable to fill in (between curly brackets)
- :class:`Optional`: part of text containing optional variables (between square brackets)
"""
filled = fill_known(template, variables)
filename = resolve_optionals(filled)
remaining = find_variables(filename)
if len(remaining) > 0:
raise filetree.MissingVariable('Variables %s not defined' % set(remaining))
return filename
def fill_known(self, variables) -> Sequence["Part"]:
"""
Fills in the given variables
"""
return [self]
def optional_variables(self, ) -> Set["Part"]:
"""
Returns all variables in optional parts
"""
return set()
def get_all(template, variables, glob_vars=()):
"""
Gets all variables matching the templates given the variables
def required_variables(self, ) -> Set["Part"]:
"""
Returns all required variables
"""
return set()
:param template: template
:param variables: (incomplete) mapping of variable names to values
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk
If `glob_vars` contains any defined variables, it will be ignored.
:return: sequence of filenames
def contains_optionals(self, variables: Set["Part"]=None):
"""
Returns True if this part contains the optional variables
"""
return False
def append_variables(self, variables: List[str]):
"""
Appends the variables in this part to the provided list in order
"""
pass
class Literal(Part):
def __init__(self, text: str):
"""
Literal part is defined purely by the text it contains
:param text: part of the template
"""
self.text = text
def __str__(self):
"""
Returns this part of the template as a string
"""
return self.text
class Required(Part):
def __init__(self, var_name, var_formatting=None):
"""
Required part of template (between curly brackets)
Required variable part of template is defined by variable name and its format
:param var_name: name of variable
:param var_formatting: how to format the variable
"""
self.var_name = var_name
self.var_formatting = var_formatting
def __str__(self):
"""
Returns this part of the template as a string
"""
if self.var_formatting is None:
return '{' + self.var_name + '}'
else:
return '{' + self.var_name + ':' + self.var_formatting + '}'
def fill_known(self, variables):
if self.var_name in variables:
return Template.parse(str(self).format(**variables)).parts
return [self]
def required_variables(self, ):
return {self.var_name}
def append_variables(self, variables):
variables.append(self.var_name)
class Optional(Part):
def __init__(self, sub_template: "Template"):
"""
Optional part of template (between square brackets)
Optional part can contain literal and required parts
:param sub_template: part of the template within square brackets
"""
self.sub_template = sub_template
def __str__(self):
return '[' + str(self.sub_template) + ']'
def fill_known(self, variables):
new_opt = self.sub_template.fill_known(variables)
if len(new_opt.required_variables()) == 0:
return Template.parse(str(new_opt)).parts
return [Optional(new_opt)]
def optional_variables(self, ):
return self.sub_template.required_variables()
def contains_optionals(self, variables=None):
if variables is None and len(self.optional_variables()) > 0:
return True
return len(self.optional_variables().intersection(variables)) > 0
def append_variables(self, variables):
variables.extend(self.sub_template.ordered_variables())
class Template:
"""
Splits a template into its constituent parts
"""
filled = fill_known(template, variables)
remaining = set(find_variables(filled))
optional = optional_variables(filled)
res = set()
if glob_vars == 'all':
glob_vars = remaining
glob_vars = set(glob_vars).difference(variables.keys())
undefined_vars = remaining.difference(glob_vars).difference(optional)
if len(undefined_vars) > 0:
raise KeyError("Required variables {} were not defined".format(undefined_vars))
for keep in itertools.product(*[(True, False) for _ in optional.intersection(glob_vars)]):
sub_variables = {var: '*' for k, var in zip(keep, optional) if k}
for var in remaining.difference(optional).intersection(glob_vars):
sub_variables[var] = '*'
sub_filled = fill_known(filled, sub_variables)
pattern = resolve_optionals(sub_filled)
assert len(find_variables(pattern)) == 0
for filename in glob.glob(pattern):
try:
extract_variables(filled, filename)
except ValueError:
def __init__(self, parts: Sequence[Part]):
if isinstance(parts, str):
raise ValueError("Input to Template should be a sequence of parts; " +
"did you mean to call `Template.parse` instead?")
self.parts = tuple(parts)
@classmethod
def parse(cls, text: str) -> "Template":
"""
Parses a text template into its constituent parts
:param text: input template as string
:return: same template split into its parts
"""
parts = []
for optional_parts in re.split(r'(\[.*?\])', text):
if len(optional_parts) > 0 and optional_parts[0] == '[' and optional_parts[-1] == ']':
if '[' in optional_parts[1:-1] or ']' in optional_parts[1:-1]:
raise ValueError(f'Can not parse {text}, because unmatching square brackets were found')
parts.append(Optional(Template.parse(optional_parts[1:-1])))
else:
for required_parts in re.split(r'(\{.*?\})', optional_parts):
if len(required_parts) > 0 and required_parts[0] == '{' and required_parts[-1] == '}':
if ':' in required_parts:
var_name, var_type = required_parts[1:-1].split(':')
else:
var_name, var_type = required_parts[1:-1], ''
parts.append(Required(var_name, var_type))
else:
parts.append(Literal(required_parts))
return Template(parts)
def __str__(self):
"""
Returns the template as a string
"""
return ''.join([str(p) for p in self.parts])
def optional_variables(self, ) -> Set[str]:
"""
Set of optional variables
"""
if len(self.parts) == 0:
return set()
optionals = set.union(*[p.optional_variables() for p in self.parts])
return optionals.difference(self.required_variables())
def required_variables(self, ) -> Set[str]:
"""
Set of required variables
"""
if len(self.parts) == 0:
return set()
return set.union(*[p.required_variables() for p in self.parts])
def ordered_variables(self, ) -> Tuple[str]:
"""
Sequence of all variables in order (can contain duplicates)
"""
ordered_vars = []
for p in self.parts:
p.append_variables(ordered_vars)
return ordered_vars
def fill_known(self, variables) -> "Template":
"""
Fill in the known variables
Any optional parts, where all variables have been filled will be automatically replaced
"""
prev = ''
while str(self) != prev:
prev = str(self)
self = self._fill_known_single(variables)
return self
def _fill_known_single(self, variables):
"""
Helper method for :meth:`_fill_known`
"""
res = []
for p in self.parts:
res.extend(p.fill_known(variables))
return Template(res)
def remove_optionals(self, optionals=None) -> "Template":
"""
Removes any optionals containing the provided variables (default: remove all)
"""
return Template([p for p in self.parts if not p.contains_optionals(optionals)])
def resolve(self, variables) -> str:
"""
Resolves the template given a set of variables
:param variables: mapping of variable names to values
:return: cleaned string
"""
clean_template = self.fill_known(variables).remove_optionals()
if len(clean_template.required_variables()) > 0:
raise KeyError("Variables %s not defined" % clean_template.required_variables())
return str(clean_template)
def get_all(self, variables, glob_vars=()) -> Tuple[Dict[str, str]]:
"""
Gets all variables for files on disk matching the templates
:param variables: (incomplete) mapping of variable names to values
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk
"""
filled = self.fill_known(variables)
if glob_vars == 'all':
glob_vars = set.union(self.required_variables(), self.optional_variables())
if len(filled.required_variables().difference(glob_vars)) > 0:
raise KeyError("Required variables {} were not defined".format(
filled.required_variables().difference(glob_vars)
))
cleaned = filled.remove_optionals(filled.optional_variables().difference(glob_vars))
return cleaned._get_all_helper(glob_vars)
def _get_all_helper(self, glob_vars):
params = set()
optionals = self.optional_variables()
for to_fill in self.optional_subsets():
pattern = str(to_fill.fill_known({var: '*' for var in glob_vars}))
while '//' in pattern:
pattern = pattern.replace('//', '/')
for filename in sorted(glob.glob(pattern)):
try:
extracted_vars = to_fill.extract_variables(filename)
for name in optionals:
if name not in extracted_vars:
extracted_vars[name] = None
params.add(tuple(sorted(extracted_vars.items(), key=lambda item: item[0])))
except ValueError:
pass
return tuple([dict(p) for p in params])
def optional_subsets(self, ) -> Iterator["Template"]:
"""
Yields template sub-sets with every combination optional variables
"""
optionals = self.optional_variables()
for n_optional in range(len(optionals) + 1):
for exclude_optional in itertools.combinations(optionals, n_optional):
yield self.remove_optionals(exclude_optional)
def extract_variables(self, filename, known_vars=None):
"""
Extracts the variable values from the filename
:param filename: filename
:param known_vars: already known variables
:return: dictionary from variable names to string representations (unused variables set to None)
"""
if known_vars is not None:
template = self.fill_known(known_vars)
else:
template = self
while '//' in filename:
filename = filename.replace('//', '/')
required = template.required_variables()
optional = template.optional_variables()
results = []
for to_fill in template.optional_subsets():
sub_re = str(to_fill.fill_known(
{var: r'(\S+)' for var in required.union(optional)},
))
while '//' in sub_re:
sub_re = sub_re.replace('//', '/')
sub_re = sub_re.replace('.', r'\.')
match = re.match(sub_re, filename)
if match is None:
continue
res.add(filename)
return sorted(res)
extracted_value = {}
ordered_vars = to_fill.ordered_variables()
assert len(ordered_vars) == len(match.groups())
failed = False
for var, value in zip(ordered_vars, match.groups()):
if var in extracted_value:
if value != extracted_value[var]:
failed = True
break
else:
extracted_value[var] = value
if failed or any('/' in value for value in extracted_value.values()):
continue
for name in template.optional_variables():
if name not in extracted_value:
extracted_value[name] = None
if known_vars is not None:
extracted_value.update(known_vars)
results.append(extracted_value)
if len(results) == 0:
raise ValueError("{} did not match {}".format(filename, template))
def fill_known(template, variables):
def score(variables):
"""
The highest score is given to the set of variables that:
1. has used the largest amount of optional variables
2. has the shortest text within the variables (only used if equal at 1
"""
number_used = len([v for v in variables.values() if v is not None])
length_hint = sum([len(v) for v in variables.values() if v is not None])
return number_used * 1000 - length_hint
best = max(results, key=score)
for var in results:
if best != var and score(best) == score(var):
raise KeyError("Multiple equivalent ways found to parse {} using {}".format(filename, template))
return best
def resolve(template, variables):
"""
Fills in the known variables filling the other variables with {<variable_name>}
Resolves the template given a set of variables
:param template: template
:param variables: mapping of variable names to values (ignoring any None)
:param variables: mapping of variable names to values
:return: cleaned string
"""
prev = ''
while prev != template:
prev = template
settings = {}
for name in set(find_variables(template)):
if name in variables and variables[name] is not None:
settings[name] = variables[name]
else:
settings[name] = '{' + name + '}'
template = template.format(**settings)
return template
return Template.parse(template).resolve(variables)
def resolve_optionals(text):
def get_all(template, variables, glob_vars=()):
"""
Resolves the optional sections
Gets all variables matching the templates given the variables
:param text: template after filling in the known variables
:return: cleaned string
:param template: template
:param variables: (incomplete) mapping of variable names to values
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk
If `glob_vars` contains any defined variables, it will be ignored.
:return: sequence of variables
"""
def resolve_single_optional(part):
if len(part) == 0:
return part
if part[0] != '[' or part[-1] != ']':
return part
elif len(find_variables(part)) == 0:
return part[1:-1]
else:
return ''
res = [resolve_single_optional(text) for text in re.split(r'(\[.*?\])', text)]
return ''.join(res)
return Template.parse(template).get_all(variables, glob_vars)
def find_variables(template):
......@@ -109,7 +380,7 @@ def find_variables(template):
:param template: full template
:return: sequence of variables
"""
return tuple(var.split(':')[0] for var in re.findall(r"\{(.*?)\}", template))
return Template.parse(template).ordered_variables()
def optional_variables(template):
......@@ -119,17 +390,7 @@ def optional_variables(template):
:param template: full template
:return: set of variables that are only present in optional parts of the string
"""
include = set()
exclude = set()
for text in re.split(r'(\[.*?\])', template):
if len(text) == 0:
continue
variables = find_variables(text)
if text[0] == '[' and text[-1] == ']':
include.update(variables)
else:
exclude.update(variables)
return include.difference(exclude)
return Template.parse(template).optional_variables()
def extract_variables(template, filename, known_vars=None):
......@@ -141,41 +402,4 @@ def extract_variables(template, filename, known_vars=None):
:param known_vars: already known variables
:return: dictionary from variable names to string representations (unused variables set to None)
"""
if known_vars is None:
known_vars = {}
template = fill_known(template, known_vars)
while '//' in filename:
filename = filename.replace('//', '/')
remaining = set(find_variables(template))
optional = optional_variables(template)
for keep in itertools.product(*[(True, False) for _ in optional]):
sub_re = resolve_optionals(fill_known(
template,
dict(
**{var: r'(\S+)' for k, var in zip(keep, optional) if k},
**{var: r'(\S+)' for var in remaining.difference(optional)}
)
))
while '//' in sub_re:
sub_re = sub_re.replace('//', '/')
sub_re = sub_re.replace('.', r'\.')
if re.match(sub_re, filename) is None:
continue
extracted_value = {}
kept_vars = [var for var in find_variables(template)
if var not in optional or keep[list(optional).index(var)]]
for var, value in zip(kept_vars, re.match(sub_re, filename).groups()):
if var in extracted_value:
if value != extracted_value[var]:
raise ValueError('Multiple values found for {}'.format(var))
else:
extracted_value[var] = value
if any('/' in value for value in extracted_value.values()):
continue
for name in find_variables(template):
if name not in extracted_value:
extracted_value[name] = None
extracted_value.update(known_vars)
return extracted_value
raise ValueError("{} did not match {}".format(filename, template))
return Template.parse(template).extract_variables(filename, known_vars)
......@@ -7,9 +7,15 @@
"""This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE.
.. note:: All of the functionality in this module is deprecated and will be
removed in a future version of fslpy. Equivalent functionality is
available in the `fsl_sub <https://git.fmrib.ox.ac.uk/fsl/fsl_sub>`_
project, and the :mod:`fsl.utils.run` module and
:mod:`.wrappers.fsl_sub` wrapper function.
Example usage, building a short pipeline::
from fsl.utils.fslsub import submit, wait
from fsl.utils.fslsub import submit
# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
......@@ -26,53 +32,249 @@ Example usage, building a short pipeline::
wait_for=(bet_job, other_job),
queue='cuda.q')
# waits for the cuda job to finish
wait(cuda_job)
.. autosummary::
:nosignatures:
submit
info
output
wait
func_to_cmd
hold
"""
from six import string_types, BytesIO
import subprocess as sp
from io import BytesIO
import os.path as op
import glob
import time
import pickle
import dill
import sys
import tempfile
import logging
import importlib
from dataclasses import dataclass, asdict
from typing import Optional, Collection, Union, Tuple, Dict
import argparse
import warnings
import os
import fsl.utils.deprecated as deprecated
import fsl.utils.run as run
log = logging.getLogger(__name__)
def submit(*command,
minutes=None,
queue=None,
architecture=None,
priority=None,
email=None,
wait_for=None,
job_name=None,
ram=None,
logdir=None,
mail_options=None,
output=None,
flags=False,
multi_threaded=None,
verbose=False):
@dataclass
class SubmitParams:
"""Represents the fsl_sub parameters
The ``SubmitParams`` class is deprecated - you should use
:mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python
library, which is installed as part of FSL.
Any command line script can be submitted by the parameters by calling the `SubmitParams` object:
.. code-block:: python
submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019'])
submit('echo finished')
This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.
It is the equivalent of
.. code-block:: bash
fsl_sub -T 1 -l log -j 108023,108019 "echo finished"
For python scripts that submit themselves to the cluster, it might be useful to give the user some control
over at least some of the submission parameters. This can be done using:
.. code-block:: python
import argparse
parser = argparse.ArgumentParser("my script doing awesome stuff")
parser.add_argument("input_file")
parser.add_argument("output_file")
SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir'))
args = parser.parse_args()
submitter = SubmitParams.from_args(args).update(minutes=10)
from fsl import wrappers
wrappers.bet(input_file, output_file, fslsub=submitter)
This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
"""
minutes: Optional[float] = None
queue: Optional[str] = None
architecture: Optional[str] = None
priority: Optional[int] = None
email: Optional[str] = None
wait_for: Union[str, None, Collection[str]] = None
job_name: Optional[str] = None
ram: Optional[int] = None
logdir: Optional[str] = None
mail_options: Optional[str] = None
flags: bool = False
multi_threaded: Optional[Tuple[str, str]] = None
verbose: bool = False
env: dict = None
cmd_line_flags = {
'-T': 'minutes',
'-q': 'queue',
'-a': 'architecture',
'-p': 'priority',
'-M': 'email',
'-N': 'job_name',
'-R': 'ram',
'-l': 'logdir',
'-m': 'mail_options',
}
def __post_init__(self):
"""
If not set explicitly by the user don't alter the environment in which the script will be submitted
"""
if self.env is None:
self.env = dict(os.environ)
def as_flags(self, ):
"""
Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
:return: tuple with the flags
"""
res = []
for key, value in self.cmd_line_flags.items():
if getattr(self, value) is not None:
res.extend((key, str(getattr(self, value))))
if self.verbose:
res.append('-v')
if self.flags:
res.append('-F')
if self.multi_threaded:
res.extend(("-s", ','.join(self.multi_threaded)))
if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
res.extend(('-j', _flatten_job_ids(self.wait_for)))
return tuple(res)
def __str__(self):
return 'SubmitParams({})'.format(" ".join(self.as_flags()))
@deprecated.deprecated('3.7.0', '4.0.0',
'Use fsl_sub or fsl.wrappers.fsl_sub instead')
def __call__(self, *command, **kwargs):
"""
Submits the command to the cluster.
:param command: string or tuple of strings with the command to submit
:param kwargs: Keyword arguments can override any parameters set in self
:return: job ID
"""
from fsl.utils.run import prepareArgs, runfsl
runner = self.update(**kwargs)
command = prepareArgs(command)
fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command))
log.debug(fsl_sub_cmd)
jobid = runfsl(fsl_sub_cmd, env=runner.env).strip()
log.debug('Job submitted as {}'.format(jobid))
return jobid
def update(self, **kwargs):
"""
Creates a new SubmitParams withe updated parameters
"""
values = asdict(self)
values.update(kwargs)
return SubmitParams(**values)
@classmethod
def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands',
include=('wait_for', 'logdir', 'email', 'mail_options')):
"""
Adds submission parameters to the parser
:param parser: parser that should understand submission commands
:param as_group: add as a new group
:param include: sequence of argument flags/names that should be added to the parser
(set to None to include everything)
:return: the group the arguments got added to
"""
from fsl.utils.run import runfsl, FSLNotPresent
try:
fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True)
except (FileNotFoundError, FSLNotPresent):
warnings.warn('fsl_sub was not found')
return
doc_lines = fsl_sub_run.splitlines()
nspaces = 1
for line in doc_lines:
if len(line.strip()) > 0:
while line.startswith(' ' * nspaces):
nspaces += 1
nspaces -= 1
if as_group:
group = parser.add_argument_group(as_group)
else:
group = parser
def get_explanation(flag):
explanation = None
for line in doc_lines:
if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
explanation.append(line[nspaces:].strip())
elif explanation is not None:
break
elif line.strip().startswith(flag):
explanation = [line[nspaces:].strip()]
if (explanation is None) or (len(explanation) == 0):
return 'documentation not found'
return ' '.join(explanation)
for flag, value in cls.cmd_line_flags.items():
if include is not None and value not in include and flag not in include:
continue
as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
action = 'store_true' if value == 'verbose' else 'store'
group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action,
metavar='<' + value + '>', type=as_type.get(value, str))
group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true')
group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true')
group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'),
metavar='<pename>,<threads>')
group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'),
metavar='<jid>')
return group
@classmethod
def from_args(cls, args):
"""
Create a SubmitParams from the command line arguments
"""
as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
if args._sub_wait_for is not None:
as_dict['wait_for'] = args._sub_wait_for.split(',')
if args._sub_multi_threaded is not None:
pename, threads = args._sub_multi_threaded.split(',')
as_dict['multi_threaded'] = pename, threads
return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict)
@deprecated.deprecated('3.7.0', '4.0.0',
'Use fsl_sub or fsl.wrappers.fsl_sub instead')
def submit(*command, **kwargs):
"""
Submits a given command to the cluster
The ``submit`` function is deprecated - you should use
:mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python
library, which is available in FSL 6.0.5 and newer.
You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
:arg command: string or regular/unpacked sequence of strings with the job command
......@@ -100,69 +302,78 @@ def submit(*command,
- <threads>: number of threads to run
:arg verbose: If True, use verbose mode
:arg env: Dict containing environment variables
:return: string of submitted job id
"""
from fsl.utils.run import runfsl, prepareArgs
base_cmd = ['fsl_sub']
for flag, variable_name in [
('-T', 'minutes'),
('-q', 'queue'),
('-a', 'architecture'),
('-p', 'priority'),
('-M', 'email'),
('-N', 'job_name'),
('-R', 'ram'),
('-l', 'logdir'),
('-m', 'mail_options'),
('-z', 'output')]:
variable = locals()[variable_name]
if variable:
base_cmd.extend([flag, str(variable)])
if flags:
base_cmd.append('-F')
if verbose:
base_cmd.append('-v')
if wait_for:
base_cmd.extend(['-j', _flatten_job_ids(wait_for)])
if multi_threaded:
base_cmd.append('-s')
base_cmd.extend(multi_threaded)
base_cmd.extend(prepareArgs(command))
return runfsl(*base_cmd).strip()
return SubmitParams(**kwargs)(*command)
def info(job_id):
@deprecated.deprecated('3.7.0', '4.0.0', 'Use fsl_sub.report instead')
def info(job_ids) -> Dict[str, Optional[Dict[str, str]]]:
"""Gets information on a given job id
Uses `qstat -j <job_id>`
The ``info`` function is deprecated - you should use the
``fsl_sub.report`` function from the ``fsl_sub`` Python library, which
is available in FSL 6.0.5 and newer.
:arg job_id: string with job id
:return: dictionary with information on the submitted job (empty
if job does not exist)
Uses `qstat -j <job_ids>`
:arg job_ids: string with job id or (nested) sequence with jobs
:return: dictionary of jobid -> another dictionary with job information
(or None if job does not exist)
"""
if not hasattr(info, '_ncall'):
info._ncall = 0
info._ncall += 1
if info._ncall == 3:
warnings.warn("Please do not call `fslsub.info` repeatably, because it slows down the cluster. You can avoid this message by simply passing all the job IDs you are interested in to a single `fslsub.info` call.")
from fsl.utils.run import run
job_ids_string = _flatten_job_ids(job_ids)
try:
result = sp.call(['qstat', '-j', job_id]).decode('utf-8')
result = run(['qstat', '-j', job_ids_string], exitcode=True)[0]
except FileNotFoundError:
log.debug("qstat not found; assuming not on cluster")
return {}
if 'Following jobs do not exist:' in result:
return {}
res = {}
for line in result.splitlines()[1:]:
key, value = line.split(':', nsplit=1)
res[key.strip()] = value.strip()
return _parse_qstat(job_ids_string, result)
def _parse_qstat(job_ids_string, qstat_stdout):
"""
Parses the qstat output into a dictionary of dictionaries
:param job_ids_string: input job ids
:param qstat_stdout: qstat output
:return: dictionary of jobid -> another dictionary with job information
(or None if job does not exist)
"""
res = {job_id: None for job_id in job_ids_string.split(',')}
current_job_id = None
for line in qstat_stdout.splitlines()[1:]:
line = line.strip()
if len(line) == 0:
continue
if line == '=' * len(line):
current_job_id = None
elif ':' in line:
current_key, value = [part.strip() for part in line.split(':', 1)]
if current_key == 'job_number':
current_job_id = value
if current_job_id not in job_ids_string:
raise ValueError(f"Unexpected job ID in qstat output:\n{line}")
res[current_job_id] = {}
else:
if current_job_id is None:
raise ValueError(f"Found job information before job ID in qstat output:\n{line}")
res[current_job_id][current_key] = value
else:
res[current_job_id][current_key] += '\n' + line
return res
@deprecated.deprecated('3.13.0', '4.0.0',
'Use fsl.utils.run.job_output instead')
def output(job_id, logdir='.', command=None, name=None):
"""Returns the output of the given job.
......@@ -173,46 +384,7 @@ def output(job_id, logdir='.', command=None, name=None):
:arg name: Job name if it was specified. Not currently used.
:returns: A tuple containing the standard output and standard error.
"""
stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
if len(stdout) != 1 or len(stderr) != 1:
raise ValueError('No/too many error/output files for job {}: stdout: '
'{}, stderr: {}'.format(job_id, stdout, stderr))
stdout = stdout[0]
stderr = stderr[0]
if op.exists(stdout):
with open(stdout, 'rt') as f:
stdout = f.read()
else:
stdout = None
if op.exists(stderr):
with open(stderr, 'rt') as f:
stderr = f.read()
else:
stderr = None
return stdout, stderr
def wait(job_ids):
"""Wait for one or more jobs to finish
:arg job_ids: string or tuple of strings with jobs that should finish
before continuing
"""
start_time = time.time()
for job_id in _flatten_job_ids(job_ids):
log.debug('Waiting for job {}'.format(job_id))
while len(info(job_id)) > 0:
wait_time = min(max(1, (time.time() - start_time) / 3.), 20)
time.sleep(wait_time)
log.debug('Job {} finished, continuing to next'.format(job_id))
log.debug('All jobs have finished')
return run.job_output(job_id, logdir, command, name)
def _flatten_job_ids(job_ids):
......@@ -237,36 +409,29 @@ def _flatten_job_ids(job_ids):
return ','.join(sorted(unpack(job_ids)))
_external_job = """#!{}
# This is a temporary file designed to run the python function {},
# so that it can be submitted to the cluster
import pickle
from six import BytesIO
from importlib import import_module
pickle_bytes = BytesIO({})
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)
if name_type == 'module':
# retrieves a function defined in an external module
func = getattr(import_module(name), func_name)
elif name_type == 'script':
# retrieves a function defined in the __main__ script
local_execute = {{'__name__': '__not_main__', '__file__': name}}
exec(open(name, 'r').read(), local_execute)
func = local_execute[func_name]
else:
raise ValueError('Unknown name_type: %r' % name_type)
res = func(*args, **kwargs)
if res is not None:
with open(__file__ + '_out.pickle') as f:
pickle.dump(f, res)
"""
@deprecated.deprecated('3.13.0', '4.0.0', 'Use fsl.utils.run.hold instead')
def hold(job_ids, hold_filename=None):
"""
Waits until all jobs have finished
Internally works by submitting a new job, which creates a file named `hold_filename`,
which will only run after all jobs in `job_ids` finished.
This function will only return once `hold_filename` has been created
def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
:param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
:param hold_filename: filename to use as a hold file.
The containing directory should exist, but the file itself should not.
Defaults to a ./.<random characters>.hold in the current directory.
:return: only returns when all the jobs have finished
"""
run.hold(job_ids, hold_filename)
@deprecated.deprecated('3.13.0', '4.0.0',
'Use fsl.utils.run.func_to_cmd or '
'fsl.utils.run.runfunc instead')
def func_to_cmd(func, args=None, kwargs=None, tmp_dir=None, clean="never", verbose=False):
"""Defines the command needed to run the function from the command line
WARNING: if submitting a function defined in the __main__ script,
......@@ -277,25 +442,13 @@ def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
:arg args: positional arguments
:arg kwargs: keyword arguments
:arg tmp_dir: directory where to store the temporary file
:arg clean: if True removes the submitted script after running it
:arg clean: Whether the script should be removed after running. There are three options:
- "never" (default): Script is kept
- "on_success": only remove if script successfully finished (i.e., no error is raised)
- "always": always remove the script, even if it raises an error
:arg verbose: If set to True, the script will print its own filename before running
:return: string which will run the function
"""
pickle_bytes = BytesIO()
if func.__module__ == '__main__':
pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
args, kwargs), pickle_bytes)
else:
pickle.dump(('module', func.__module__, func.__name__,
args, kwargs), pickle_bytes)
python_cmd = _external_job.format(sys.executable,
func.__name__,
pickle_bytes.getvalue())
_, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
suffix='.py',
dir=tmp_dir)
with open(filename, 'w') as python_file:
python_file.write(python_cmd)
return sys.executable + " " + filename + ('; rm ' + filename if clean else '')
return run.func_to_cmd(func, args, kwargs, tmp_dir, clean, verbose)
......@@ -85,13 +85,39 @@ from collections import abc
try: import queue
except ImportError: import Queue as queue
from fsl.utils.deprecated import deprecated
log = logging.getLogger(__name__)
class IdleTask(object):
@functools.lru_cache()
def _canHaveGui():
"""Return ``True`` if wxPython is installed, and a display is available,
``False`` otherwise.
"""
# Determine if a display is available. We do
# this once at init (instead of on-demand in
# the canHaveGui method) because calling the
# IsDisplayAvailable function will cause the
# application to steal focus under OSX!
try:
import wx
return wx.App.IsDisplayAvailable()
except ImportError:
return False
def _haveGui():
"""Return ``True`` if wxPython is installed, a display is available, and
a ``wx.App`` exists, ``False`` otherwise.
"""
try:
import wx
return _canHaveGui() and (wx.GetApp() is not None)
except ImportError:
return False
class IdleTask:
"""Container object used by the :class:`IdleLoop` class.
Used to encapsulate information about a queued task.
"""
......@@ -113,7 +139,7 @@ class IdleTask(object):
self.kwargs = kwargs
class IdleLoop(object):
class IdleLoop:
"""This class contains logic for running tasks via ``wx.EVT_IDLE`` events.
A single ``IdleLoop`` instance is created when this module is first
......@@ -372,8 +398,6 @@ class IdleLoop(object):
``timeout``, or ``alwaysQueue``.
"""
from fsl.utils.platform import platform as fslplatform
schedtime = time.time()
timeout = kwargs.pop('timeout', 0)
after = kwargs.pop('after', 0)
......@@ -382,18 +406,15 @@ class IdleLoop(object):
skipIfQueued = kwargs.pop('skipIfQueued', False)
alwaysQueue = kwargs.pop('alwaysQueue', False)
canHaveGui = fslplatform.canHaveGui
haveGui = fslplatform.haveGui
# If there is no possibility of a
# gui being available in the future
# (determined by canHaveGui), then
# (determined by _canHaveGui), then
# alwaysQueue is ignored.
alwaysQueue = alwaysQueue and canHaveGui
alwaysQueue = alwaysQueue and _canHaveGui()
# We don't have wx - run the task
# directly/synchronously.
if self.__neverQueue or not (haveGui or alwaysQueue):
if self.__neverQueue or not (_haveGui() or alwaysQueue):
time.sleep(after)
log.debug('Running idle task directly')
task(*args, **kwargs)
......@@ -597,36 +618,6 @@ def idleWhen(*args, **kwargs):
idleLoop.idleWhen(*args, **kwargs)
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.inIdle instead')
def inIdle(taskName):
"""Deprecated - use ``idleLoop.inIdle`` instead. """
return idleLoop.inIdle(taskName)
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.cancelIdle instead')
def cancelIdle(taskName):
"""Deprecated - use ``idleLoop.cancelIdle`` instead. """
return idleLoop.cancelIdle(taskName)
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.reset instead')
def idleReset():
"""Deprecated - use ``idleLoop.reset`` instead. """
return idleLoop.reset()
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.callRate instead')
def getIdleTimeout():
"""Deprecated - use ``idleLoop.callRate`` instead. """
return idleLoop.callRate
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.callRate instead')
def setIdleTimeout(timeout=None):
"""Deprecated - use ``idleLoop.callRate`` instead. """
idleLoop.callRate = timeout
def block(secs, delta=0.01, until=None):
"""Blocks for the specified number of seconds, yielding to the main ``wx``
loop.
......@@ -643,11 +634,13 @@ def block(secs, delta=0.01, until=None):
determins when calls to ``block`` will return.
"""
havewx = _haveGui()
def defaultUntil():
return False
def tick():
if fslplatform.haveGui:
if havewx:
import wx
wx.YieldIfNeeded()
time.sleep(delta)
......@@ -655,8 +648,6 @@ def block(secs, delta=0.01, until=None):
if until is None:
until = defaultUntil
from fsl.utils.platform import platform as fslplatform
start = time.time()
while (time.time() - start) < secs:
tick()
......@@ -685,12 +676,11 @@ def run(task, onFinish=None, onError=None, name=None):
the return value will be ``None``.
"""
from fsl.utils.platform import platform as fslplatform
if name is None:
name = getattr(task, '__name__', '<unknown>')
haveWX = fslplatform.haveGui
haveWX = _haveGui()
# Calls the onFinish or onError handler
def callback(cb, *args, **kwargs):
......@@ -759,14 +749,12 @@ def wait(threads, task, *args, **kwargs):
a keyword argument called ``wait_direct``.
"""
from fsl.utils.platform import platform as fslplatform
direct = kwargs.pop('wait_direct', False)
if not isinstance(threads, abc.Sequence):
threads = [threads]
haveWX = fslplatform.haveGui
haveWX = _haveGui()
def joinAll():
log.debug('Wait thread joining on all targets')
......@@ -787,14 +775,15 @@ def wait(threads, task, *args, **kwargs):
return None
class Task(object):
class Task:
"""Container object which encapsulates a task that is run by a
:class:`TaskThread`.
"""
def __init__(self, name, func, onFinish, args, kwargs):
def __init__(self, name, func, onFinish, onError, args, kwargs):
self.name = name
self.func = func
self.onFinish = onFinish
self.onError = onError
self.args = args
self.kwargs = kwargs
self.enabled = True
......@@ -806,7 +795,6 @@ class TaskThreadVeto(Exception):
handler (if one has been specified). See the :meth:`TaskThread.enqueue`
method for more details.
"""
pass
class TaskThread(threading.Thread):
......@@ -840,9 +828,16 @@ class TaskThread(threading.Thread):
:arg onFinish: An optional function to be called (via :func:`idle`)
when the task funtion has finished. Must be provided as
a keyword argument. If the ``func`` raises a
:class`TaskThreadVeto` error, this function will not
be called.
a keyword argument, and must itself accept no arguments.
If the ``func`` raises a :class`TaskThreadVeto` error,
this function will not be called.
:arg onError: An optional function to be called (via :func:`idle`)
if the task funtion raises an ``Exception``. Must be
provided as a keyword argument, and must itself accept
the raised ``Exception`` object as a single argument.
If the ``func`` raises a :class`TaskThreadVeto` error,
this function will not be called.
All other arguments are passed through to the task function when it is
executed.
......@@ -853,16 +848,18 @@ class TaskThread(threading.Thread):
results.
.. warning:: Make sure that your task function is not expecting keyword
arguments called ``taskName`` or ``onFinish``!
arguments called ``taskName``, ``onFinish``, or
``onError``!
"""
name = kwargs.pop('taskName', None)
onFinish = kwargs.pop('onFinish', None)
onError = kwargs.pop('onError', None)
log.debug('Enqueueing task: {} [{}]'.format(
name, getattr(func, '__name__', '<unknown>')))
t = Task(name, func, onFinish, args, kwargs)
t = Task(name, func, onFinish, onError, args, kwargs)
self.__enqueued[name] = t
self.__q.put(t)
......@@ -983,6 +980,9 @@ class TaskThread(threading.Thread):
type(e).__name__,
str(e)),
exc_info=True)
if task.onError is not None:
idle(task.onError, e)
finally:
self.__q.task_done()
......@@ -1024,7 +1024,7 @@ def mutex(*args, **kwargs):
return MutexFactory(*args, **kwargs)
class MutexFactory(object):
class MutexFactory:
"""The ``MutexFactory`` is a placeholder for methods which have been
decorated with the :func:`mutex` decorator. When the method of a class
is decorated with ``@mutex``, a ``MutexFactory`` is created.
......
......@@ -10,8 +10,7 @@ manipulating and working with :class:`.Image` objects.
The following modules are available:
.. autosummary::
:nosignature
.image.resample
.image.roi
fsl.utils.image.resample
fsl.utils.image.roi
"""
......@@ -18,7 +18,6 @@ import numpy as np
import scipy.ndimage as ndimage
import fsl.transform.affine as affine
import fsl.utils.deprecated as deprecated
def resampleToPixdims(image, newPixdims, **kwargs):
......@@ -47,9 +46,9 @@ def resampleToReference(image, reference, matrix=None, **kwargs):
along the spatial (first three) dimensions.
:arg image: :class:`.Image` to resample
:arg matrix: Optional world-to-world affine alignment matrix
:arg reference: :class:`.Nifti` defining the space to resample ``image``
into
:arg matrix: Optional world-to-world affine alignment matrix
"""
oldShape = list(image.shape)
......@@ -191,7 +190,7 @@ def resample(image,
if origin not in ('centre', 'corner'):
raise ValueError('Invalid value for origin: {}'.format(origin))
data = np.array(image[sliceobj], dtype=dtype, copy=False)
data = np.asarray(image[sliceobj], dtype=dtype)
if len(data.shape) != len(newShape):
raise ValueError('Data dimensions do not match new shape: '
......@@ -204,12 +203,13 @@ def resample(image,
if matrix is None:
matrix = affine.rescale(data.shape, newShape, origin)
# identity matrix? the image
# doesn't need to be resampled
if np.all(np.isclose(matrix, np.eye(len(newShape) + 1))):
# same shape and identity matrix? the
# image doesn't need to be resampled
if np.all(np.isclose(data.shape, newShape)) and \
np.all(np.isclose(matrix, np.eye(len(newShape) + 1))):
return data, image.voxToWorldMat
newShape = np.array(np.round(newShape), dtype=np.int)
newShape = np.array(np.round(newShape), dtype=int)
# Apply smoothing if requested,
# and if not using nn interp
......@@ -281,13 +281,3 @@ def applySmoothing(data, matrix, newShape):
sigma[ratio >= 1.1] *= 0.425
return ndimage.gaussian_filter(data, sigma)
@deprecated.deprecated('2.9.0', '3.0.0',
'Use fsl.transform.affine.rescale instead')
def calculateMatrix(oldShape, newShape, origin):
"""Deprecated - use :func:`.affine.rescale` instead. """
xform = affine.rescale(oldShape, newShape, origin)
if np.all(np.isclose(xform, np.eye(len(oldShape) + 1))):
return None
return xform[:-1, :]
......@@ -18,6 +18,7 @@ import os
import os.path as op
import shutil
import numpy as np
import nibabel as nib
import fsl.utils.path as fslpath
......@@ -44,12 +45,13 @@ def imcp(src,
already exist. Defaults to ``False``.
:arg useDefaultExt: Defaults to ``False``. If ``True``, the destination
file type will be set according to the default
extension, specified by
:func:`~fsl.data.image.defaultExt`. If the source
file does not have the same type as the default
file type will be set according to the default file
type, specified by
:func:`~fsl.data.image.defaultOutputType`. If the
source file does not have the same type as the default
extension, it will be converted. If ``False``, the
source file type is not changed.
source file type is used, and the destination file type
(if specified) is ignored.
:arg move: If ``True``, the files are moved, instead of being
copied. See :func:`immv`.
......@@ -57,7 +59,7 @@ def imcp(src,
# special case - non-existent directory
if dest.endswith('/') and not op.isdir(dest):
raise fslpath.PathError('Directory does not exist: {}'.format(dest))
raise fslpath.PathError(f'Directory does not exist: {dest}')
if op.isdir(dest):
dest = op.join(dest, op.basename(src))
......@@ -87,17 +89,22 @@ def imcp(src,
if not op.exists(src):
raise fslpath.PathError('imcp error - source path '
'does not exist: {}'.format(src))
# Figure out the destination file
# extension/type. If useDefaultExt
# is True, we use the default
# extension. Otherwise, if no
# destination file extension is
# provided, we use the source
# extension.
if useDefaultExt: destExt = fslimage.defaultExt()
elif destExt == '': destExt = srcExt
f'does not exist: {src}')
# Infer the image type of the source image. We
# can't just look at the extension, as e.g. an
# .img file can be any of ANALYZE/NIFTI1/NIFTI2
srcType = fslimage.fileType(src)
# Figure out the destination file extension/type.
# If useDefaultExt is True, we use the default
# extension. Otherwise we use the source type
if useDefaultExt:
destType = fslimage.defaultOutputType()
destExt = fslimage.defaultExt()
else:
destType = srcType
destExt = srcExt
# Resolve any file group differences
# e.g. we don't care if the src is
......@@ -116,10 +123,10 @@ def imcp(src,
# Give up if we don't have permission.
if not os.access(op.dirname(dest), os.W_OK | os.X_OK):
raise fslpath.PathError('imcp error - cannot write to {}'.format(dest))
raise fslpath.PathError(f'imcp error - cannot write to {dest}')
if move and not os.access(op.dirname(src), os.W_OK | os.X_OK):
raise fslpath.PathError('imcp error - cannot move from {}'.format(src))
raise fslpath.PathError(f'imcp error - cannot move from {src}')
# If the source file type does not
# match the destination file type,
......@@ -129,14 +136,48 @@ def imcp(src,
# io and cpu, but programmatically
# very easy - nibabel does all the
# hard work.
if srcExt != destExt:
if srcType != destType:
if not overwrite and op.exists(dest):
raise fslpath.PathError('imcp error - destination already '
'exists ({})'.format(dest))
raise fslpath.PathError('imcp error - destination '
f'already exists ({dest})')
img = nib.load(src)
# Force conversion to specific file format if
# necessary. The file format (pair, gzipped
# or not) is taken care of automatically by
# nibabel
if 'ANALYZE' in destType.name: cls = nib.AnalyzeImage
elif 'NIFTI2' in destType.name: cls = nib.Nifti2Image
elif 'NIFTI' in destType.name: cls = nib.Nifti1Image
# The default behaviour of nibabel when saving
# is to rescale the image data to the full range
# of the data type, and then set the scl_slope/
# inter header fields accordingly. This is highly
# disruptive in many circumstances. Fortunately:
# - The nibabel ArrayProxy class provides a
# get_unscaled method, which allows us to
# bypass the rescaling at load time.
# - Explicitly setting the slope and intercept
# on the header allows us to bypass rescaling
# at save time.
#
# https://github.com/nipy/nibabel/issues/1001
# https://neurostars.org/t/preserve-datatype-and-precision-with-nibabel/27641/2
slope = img.dataobj.slope
inter = img.dataobj.inter
data = np.asanyarray(img.dataobj.get_unscaled(),
dtype=img.get_data_dtype())
img = cls(data, None, header=img.header)
img.header.set_slope_inter(slope, inter)
nib.save(img, dest)
# Make sure the image reference is cleared, and
# hopefully GC'd, as otherwise we sometimes get
# errors on Windows (mostly in unit tests) w.r.t.
# attempts to delete files which are still open
img = None
if move:
......@@ -193,7 +234,7 @@ def imcp(src,
# paths already exist
if not overwrite and any([op.exists(d) for d in copyDests]):
raise fslpath.PathError('imcp error - a destination path already '
'exists ({})'.format(', '.join(copyDests)))
f'exists ({",".join(copyDests)})')
# Do the copy/move
for src, dest in zip(copySrcs, copyDests):
......
......@@ -21,7 +21,6 @@ a function:
import logging
import hashlib
import functools
import six
log = logging.getLogger(__name__)
......@@ -171,7 +170,7 @@ def memoizeMD5(func):
# compatible) bytes , and take
# the hash of those bytes.
for arg in args:
if not isinstance(arg, six.string_types):
if not isinstance(arg, str):
arg = str(arg)
arg = arg.encode('utf-8')
hashobj.update(arg)
......@@ -243,8 +242,8 @@ def skipUnchanged(func):
isarray = oldIsArray or newIsArray
if isarray:
a = np.array(oldVal, copy=False)
b = np.array(value, copy=False)
a = np.asarray(oldVal)
b = np.asarray(value)
nochange = (a.shape == b.shape) and np.allclose(a, b)
else:
......
......@@ -7,12 +7,9 @@
"""This module provides the :class:`Meta` class. """
import collections
class Meta(object):
"""The ``Meta`` class is intended to be used as a mixin for other classes. It
is simply a wrapper for a dictionary of key-value pairs.
class Meta:
"""The ``Meta`` class is intended to be used as a mixin for other classes.
It is simply a wrapper for a dictionary of key-value pairs.
It has a handful of methods allowing you to add and access additional
metadata associated with an object.
......@@ -20,6 +17,7 @@ class Meta(object):
.. autosummary::
:nosignatures:
meta
metaKeys
metaValues
metaItems
......@@ -32,11 +30,17 @@ class Meta(object):
"""Initialises a ``Meta`` instance. """
new = super(Meta, cls).__new__(cls)
new.__meta = collections.OrderedDict()
new.__meta = {}
return new
@property
def meta(self):
"""Return a reference to the metadata dictionary. """
return self.__meta
def metaKeys(self):
"""Returns the keys contained in the metadata dictionary
(``dict.keys``).
......
......@@ -14,9 +14,6 @@ import inspect
import contextlib
import collections
import six
import fsl.utils.idle as idle
import fsl.utils.weakfuncref as weakfuncref
......@@ -36,7 +33,7 @@ class Registered(Exception):
pass
class _Listener(object):
class _Listener:
"""This class is used internally by the :class:`.Notifier` class to
store references to callback functions.
"""
......@@ -61,22 +58,48 @@ class _Listener(object):
return self.__callback.function()
@property
def expectsArguments(self):
"""Returns ``True`` if the listener function needs to be passed
arguments, ``False`` otherwise. Listener functions can
be defined to accept either zero arguments, or a set of
positional arguments - see :meth:`Notifier.register` for details.
"""
func = self.callback
# the function may have been GC'd
if func is None:
return False
spec = inspect.signature(func)
posargs = 0
varargs = False
for param in spec.parameters.values():
if param.kind in (inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD):
posargs += 1
elif param.kind == inspect.Parameter.VAR_POSITIONAL:
varargs = True
return varargs or ((not varargs) and (posargs == 3))
def __str__(self):
cb = self.callback
if cb is not None: cbName = getattr(cb, '__name__', '<callable>')
else: cbName = '<deleted>'
if cb is not None: name = getattr(cb, '__name__', '<callable>')
else: name = '<deleted>'
return 'Listener {} [topic: {}] [function: {}]'.format(
self.name, self.topic, cbName)
return f'Listener {self.name} [topic: {self.topic}] [function: {name}]'
def __repr__(self):
return self.__str__()
class Notifier(object):
class Notifier:
"""The ``Notifier`` class is a mixin which provides simple notification
capability. Listeners can be registered/deregistered to listen via the
:meth:`register` and :meth:`deregister` methods, and notified via the
......@@ -119,8 +142,8 @@ class Notifier(object):
:arg name: A unique name for the listener.
:arg callback: The function to call - must accept two positional
arguments:
:arg callback: The function to call - must accept either zero
arguments, or three positional arguments:
- this ``Notifier`` instance.
......@@ -147,12 +170,12 @@ class Notifier(object):
listener = _Listener(name, callback, topic, runOnIdle)
if name in self.__listeners[topic]:
raise Registered('Listener {} is already registered'.format(name))
raise Registered(f'Listener {name} is already registered')
self.__listeners[topic][name] = listener
self.__enabled[ topic] = self.__enabled.get(topic, True)
log.debug('{}: Registered {}'.format(type(self).__name__, listener))
log.debug('%s: Registered %s', type(self).__name__, listener)
def deregister(self, name, topic=None):
......@@ -186,8 +209,8 @@ class Notifier(object):
self.__listeners.pop(topic)
self.__enabled .pop(topic)
log.debug('{}: De-registered listener {}'.format(
type(self).__name__, listener))
log.debug('%s: De-registered listener %s',
type(self).__name__, listener)
def enable(self, name, topic=None, enable=True):
......@@ -297,7 +320,7 @@ class Notifier(object):
:arg topic: Topic or topics that the listener is registered on.
"""
if topic is None or isinstance(topic, six.string_types):
if topic is None or isinstance(topic, str):
topic = [topic]
topics = topic
......@@ -347,12 +370,12 @@ class Notifier(object):
srcMod = '...{}'.format(frame[1][-20:])
srcLine = frame[2]
log.debug('{}: Notifying {} listeners (topic: {}) [{}:{}]'.format(
log.debug('%s: Notifying %s listeners (topic: %s) [%s:%s]',
type(self).__name__,
len(listeners),
topic,
srcMod,
srcLine))
srcLine)
for listener in listeners:
......@@ -363,15 +386,19 @@ class Notifier(object):
# callback function may have been
# gc'd - remove it if this is the case.
if callback is None:
log.debug('Listener {} has been gc\'d - '
'removing from list'.format(name))
log.debug('Listener %s has been gc\'d - '
'removing from list', name)
self.__listeners[listener.topic].pop(name)
continue
elif not listener.enabled:
if not listener.enabled:
continue
elif listener.runOnIdle: idle.idle(callback, self, topic, value)
else: callback( self, topic, value)
if listener.expectsArguments: args = (self, topic, value)
else: args = ()
if listener.runOnIdle: idle.idle(callback, *args)
else: callback( *args)
def __getListeners(self, topic):
......
......@@ -23,6 +23,8 @@ paths.
removeDuplicates
uniquePrefix
commonBase
wslpath
winpath
"""
......@@ -30,13 +32,19 @@ import os.path as op
import os
import glob
import operator
import pathlib
import re
from typing import Sequence, Tuple, Union
PathLike = Union[str, pathlib.Path]
class PathError(Exception):
"""``Exception`` class raised by the functions defined in this module
when something goes wrong.
"""
pass
def deepest(path, suffixes):
......@@ -47,12 +55,12 @@ def deepest(path, suffixes):
path = path.strip()
if path == op.sep or path == '':
if path in (op.sep, ''):
return None
path = path.rstrip(op.sep)
if any([path.endswith(s) for s in suffixes]):
if any(path.endswith(s) for s in suffixes):
return path
return deepest(op.dirname(path), suffixes)
......@@ -76,7 +84,7 @@ def shallowest(path, suffixes):
if parent is not None:
return parent
if any([path.endswith(s) for s in suffixes]):
if any(path.endswith(s) for s in suffixes):
return path
return None
......@@ -96,19 +104,23 @@ def allFiles(root):
return files
def hasExt(path, allowedExts):
def hasExt(path : PathLike,
allowedExts : Sequence[str]) -> bool:
"""Convenience function which returns ``True`` if the given ``path``
ends with any of the given ``allowedExts``, ``False`` otherwise.
"""
return any([path.endswith(e) for e in allowedExts])
def addExt(prefix,
allowedExts=None,
mustExist=True,
defaultExt=None,
fileGroups=None,
unambiguous=True):
path = str(path)
return any(path.endswith(e) for e in allowedExts)
def addExt(
prefix : PathLike,
allowedExts : Sequence[str] = None,
mustExist : bool = True,
defaultExt : str = None,
fileGroups : Sequence[Sequence[str]] = None,
unambiguous : bool = True
) -> Union[Sequence[str], str]:
"""Adds a file extension to the given file ``prefix``.
If ``mustExist`` is False, and the file does not already have a
......@@ -143,6 +155,8 @@ def addExt(prefix,
containing *all* matching files is returned.
"""
prefix = str(prefix)
if allowedExts is None: allowedExts = []
if fileGroups is None: fileGroups = {}
......@@ -184,7 +198,8 @@ def addExt(prefix,
# If ambiguity is ok, return
# all matching paths
elif not unambiguous:
if not unambiguous:
return allPaths
# Ambiguity is not ok! More than
......@@ -218,37 +233,73 @@ def addExt(prefix,
return allPaths[0]
def removeExt(filename, allowedExts=None):
def removeExt(
filename : PathLike,
allowedExts : Sequence[str] = None,
firstDot : bool = False
) -> str:
"""Returns the base name of the given file name. See :func:`splitExt`. """
return splitExt(filename, allowedExts, firstDot)[0]
return splitExt(filename, allowedExts)[0]
def getExt(filename, allowedExts=None):
def getExt(
filename : PathLike,
allowedExts : Sequence[str] = None,
firstDot : bool = False
) -> str:
"""Returns the extension of the given file name. See :func:`splitExt`. """
return splitExt(filename, allowedExts)[1]
return splitExt(filename, allowedExts, firstDot)[1]
def splitExt(filename, allowedExts=None):
def splitExt(
filename : PathLike,
allowedExts : Sequence[str] = None,
firstDot : bool = False
) -> Tuple[str, str]:
"""Returns the base name and the extension from the given file name.
If ``allowedExts`` is ``None``, this function is equivalent to using::
If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this
function is equivalent to using::
os.path.splitext(filename)
If ``allowedExts`` is provided, but the file does not end with an allowed
extension, a tuple containing ``(filename, '')`` is returned.
If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file
name is split on the first period that is found, rather than the last
period. For example::
splitExt('image.nii.gz') # -> ('image.nii', '.gz')
splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz')
If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if
the file does not end with an allowed extension, a tuple containing
``(filename, '')`` is returned.
:arg filename: The file name to split.
:arg allowedExts: Allowed/recognised file extensions.
:arg firstDot: Split the file name on the first period, rather than the
last period. Ignored if ``allowedExts`` is specified.
"""
# If allowedExts is not specified,
# we just use op.splitext
filename = str(filename)
# If allowedExts is not specified
# we split on a period character
if allowedExts is None:
return op.splitext(filename)
# split on last period - equivalent
# to op.splitext
if not firstDot:
return op.splitext(filename)
# split on first period
else:
idx = filename.find('.')
if idx == -1:
return filename, ''
else:
return filename[:idx], filename[idx:]
# Otherwise, try and find a suffix match
extMatches = [filename.endswith(ext) for ext in allowedExts]
......@@ -436,7 +487,7 @@ def removeDuplicates(paths, allowedExts=None, fileGroups=None):
groupFiles = getFileGroup(path, allowedExts, fileGroups)
if not any([p in unique for p in groupFiles]):
if not any(p in unique for p in groupFiles):
unique.append(groupFiles[0])
return unique
......@@ -463,14 +514,13 @@ def uniquePrefix(path):
break
# Should never happen if path is valid
elif len(hits) == 0 or idx >= len(filename) - 1:
if len(hits) == 0 or idx >= len(filename) - 1:
raise PathError('No unique prefix for {}'.format(filename))
# Not unique - continue looping
else:
idx += 1
prefix = prefix + filename[idx]
hits = [h for h in hits if h.startswith(prefix)]
idx += 1
prefix = prefix + filename[idx]
hits = [h for h in hits if h.startswith(prefix)]
return prefix
......@@ -496,7 +546,66 @@ def commonBase(paths):
last = base
if all([p.startswith(base) for p in paths]):
if all(p.startswith(base) for p in paths):
return base
raise PathError('No common base')
def wslpath(path):
"""Convert Windows path (or a command line argument containing a Windows
path) to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``).
Also supports paths in the form ``\\wsl$\\(distro)\\users\\...``
:param winpath: Command line argument which may (or may not) contain a
Windows path. It is assumed to be either of the form
<windows path> or --<arg>=<windows path>. Note that we
don't need to handle --arg <windows path> or -a <windows
path> since in these cases the argument and the path will
be parsed as separate entities.
:return: If ``winpath`` matches a Windows path, the converted
argument (including the --<arg>= portion). Otherwise
returns ``winpath`` unchanged.
"""
match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", path)
if match:
arg, path = match.group(1, 2)
if arg is None:
arg = ""
return arg + path.replace("\\", "/")
match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", path)
if match:
arg, drive, path = match.group(1, 2, 3)
if arg is None:
arg = ""
return arg + "/mnt/" + drive.lower() + path.replace("\\", "/")
return path
def winpath(path):
"""Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a
path that can be used from Windows.
If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified
Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL.
This requires WSL2 which supports the ``\\wsl$\\`` network path.
wslpath is assumed to be an absolute path.
"""
from fsl.utils.platform import platform # pylint: disable=import-outside-toplevel # noqa: E501
if not platform.fslwsl:
return path
else:
match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir)
if match:
distro = match.group(1)
else:
distro = None
if not distro:
raise RuntimeError('Could not identify WSL installation from '
'FSLDIR (%s)' % platform.fsldir)
return "\\\\wsl$\\" + distro + path.replace("/", "\\")
......@@ -18,7 +18,8 @@ import os.path as op
import sys
import importlib
import fsl.utils.notifier as notifier
import fsl.utils.notifier as notifier
import fsl.utils.deprecated as deprecated
# An annoying consequence of using
# a system-module name for our own
......@@ -87,6 +88,7 @@ class Platform(notifier.Notifier):
frozen
fsldir
fsldevdir
fslVersion
haveGui
canHaveGui
inSSHSession
......@@ -110,26 +112,15 @@ class Platform(notifier.Notifier):
self.WX_MAC_CARBON = WX_MAC_CARBON
self.WX_GTK = WX_GTK
self.__inSSHSession = False
self.__inVNCSession = False
# initialise fsldir - see fsldir.setter
self.fsldir = self.fsldir
# These are all initialised on first access
self.__glVersion = None
self.__glRenderer = None
self.__glIsSoftware = None
self.__fslVersion = None
# initialise fsldir - see fsldir.setter
self.fsldir = self.fsldir
# Determine if a display is available. We do
# this once at init (instead of on-demand in
# the canHaveGui method) because calling the
# IsDisplayAvailable function will cause the
# application to steal focus under OSX!
try:
import wx
self.__canHaveGui = wx.App.IsDisplayAvailable()
except ImportError:
self.__canHaveGui = False
self.__canHaveGui = None
# If one of the SSH_/VNC environment
# variables is set, then we're probably
......@@ -150,6 +141,10 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def frozen(self):
"""``True`` if we are running in a compiled/frozen application,
``False`` otherwise.
......@@ -158,6 +153,10 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def haveGui(self):
"""``True`` if we are running with a GUI, ``False`` otherwise.
......@@ -168,7 +167,7 @@ class Platform(notifier.Notifier):
the event loop is called periodically, and so is not always running.
"""
try:
import wx
import wx # pylint: disable=import-outside-toplevel
app = wx.GetApp()
# TODO Previously this conditional
......@@ -201,12 +200,31 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def canHaveGui(self):
"""``True`` if it is possible to create a GUI, ``False`` otherwise. """
# Determine if a display is available. Note that
# calling the IsDisplayAvailable function will
# cause the application to steal focus under OSX!
if self.__canHaveGui is None:
try:
import wx # pylint: disable=import-outside-toplevel
self.__canHaveGui = wx.App.IsDisplayAvailable()
except ImportError:
self.__canHaveGui = False
return self.__canHaveGui
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def inSSHSession(self):
"""``True`` if this application is running over an SSH session,
``False`` otherwise.
......@@ -215,6 +233,10 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def inVNCSession(self):
"""``True`` if this application is running over a VNC (or similar)
session, ``False`` otherwise. Currently, the following remote desktop
......@@ -228,6 +250,10 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def wxPlatform(self):
"""One of :data:`WX_UNKNOWN`, :data:`WX_MAC_COCOA`,
:data:`WX_MAC_CARBON`, or :data:`WX_GTK`, indicating the wx platform.
......@@ -236,14 +262,14 @@ class Platform(notifier.Notifier):
if not self.canHaveGui:
return WX_UNKNOWN
import wx
import wx # pylint: disable=import-outside-toplevel
pi = [t.lower() for t in wx.PlatformInfo]
if any(['cocoa' in p for p in pi]): plat = WX_MAC_COCOA
elif any(['carbon' in p for p in pi]): plat = WX_MAC_CARBON
elif any(['gtk' in p for p in pi]): plat = WX_GTK
else: plat = WX_UNKNOWN
if any('cocoa' in p for p in pi): plat = WX_MAC_COCOA
elif any('carbon' in p for p in pi): plat = WX_MAC_CARBON
elif any('gtk' in p for p in pi): plat = WX_GTK
else: plat = WX_UNKNOWN
if plat is WX_UNKNOWN:
log.warning('Could not determine wx platform from '
......@@ -253,6 +279,10 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def wxFlavour(self):
"""One of :data:`WX_UNKNOWN`, :data:`WX_PYTHON` or :data:`WX_PHOENIX`,
indicating the wx flavour.
......@@ -261,7 +291,7 @@ class Platform(notifier.Notifier):
if not self.canHaveGui:
return WX_UNKNOWN
import wx
import wx # pylint: disable=import-outside-toplevel
pi = [t.lower() for t in wx.PlatformInfo]
isPhoenix = False
......@@ -292,6 +322,23 @@ class Platform(notifier.Notifier):
return os.environ.get('FSLDEVDIR', None)
@property
def wsl(self):
"""Boolean flag indicating whether we are running under Windows
Subsystem for Linux.
"""
plat = builtin_platform.platform().lower()
return 'microsoft' in plat
@property
def fslwsl(self):
"""Boolean flag indicating whether FSL is installed in Windows
Subsystem for Linux
"""
return self.fsldir is not None and self.fsldir.startswith("\\\\wsl$")
@fsldir.setter
def fsldir(self, value):
"""Changes the value of the :attr:`fsldir` property, and notifies any
......@@ -308,18 +355,13 @@ class Platform(notifier.Notifier):
if value is None:
os.environ.pop('FSLDIR', None)
else:
os.environ['FSLDIR'] = value
# Set the FSL version field if we can
versionFile = op.join(value, 'etc', 'fslversion')
if op.exists(versionFile):
with open(versionFile, 'rt') as f:
# split string at colon for new hash style versions
# first object in list is the non-hashed version string (e.g. 6.0.2)
# if no ":hash:" then standard FSL version string is still returned
self.__fslVersion = f.read().strip().split(":")[0]
# clear fslversion - it will
# be re-read on next access
self.__fslVersion = None
self.notify(value=value)
......@@ -349,10 +391,31 @@ class Platform(notifier.Notifier):
"""Returns the FSL version as a string, e.g. ``'5.0.9'``. Returns
``None`` if a FSL installation could not be found.
"""
if self.__fslVersion is not None:
return self.__fslVersion
if self.fsldir is None:
return None
# Set the FSL version field if we can
versionFile = op.join(self.fsldir, 'etc', 'fslversion')
if op.exists(versionFile):
with open(versionFile, 'rt') as f:
# split string at colon for new hash style versions
# first object in list is the non-hashed version string
# (e.g. 6.0.2) if no ":hash:" then standard FSL version
# string is still returned
self.__fslVersion = f.read().strip().split(":")[0]
return self.__fslVersion
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glVersion(self):
"""Returns the available OpenGL version, or ``None`` if it has not
been set.
......@@ -361,12 +424,20 @@ class Platform(notifier.Notifier):
@glVersion.setter
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glVersion(self, value):
"""Set the available OpenGL version. """
self.__glVersion = value
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glRenderer(self):
"""Returns the available OpenGL renderer, or ``None`` if it has not
been set.
......@@ -375,6 +446,10 @@ class Platform(notifier.Notifier):
@glRenderer.setter
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glRenderer(self, value):
"""Set the available OpenGL renderer. """
self.__glRenderer = value
......@@ -392,6 +467,10 @@ class Platform(notifier.Notifier):
@property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glIsSoftwareRenderer(self):
"""Returns ``True`` if the OpenGL renderer is software based,
``False`` otherwise, or ``None`` if the renderer has not yet been set.
......
......@@ -3,6 +3,7 @@
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module provides some functions for running shell commands.
......@@ -14,24 +15,34 @@
run
runfsl
wait
runfunc
func_to_cmd
dryrun
hold
job_output
"""
import sys
import logging
import threading
import contextlib
import collections
import subprocess as sp
import os.path as op
import io
import sys
import glob
import time
import shlex
import logging
import tempfile
import threading
import contextlib
import collections.abc as abc
import subprocess as sp
import os.path as op
import os
import textwrap as tw
import six
import dill
from fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub as fslsub
import fsl.utils.tempdir as tempdir
import fsl.utils.path as fslpath
log = logging.getLogger(__name__)
......@@ -42,6 +53,12 @@ DRY_RUN = False
execute them.
"""
DRY_RUN_COMMANDS = None
"""Contains the commands that got logged during a dry run.
Commands will be logged if :data:`DRY_RUN` is true, which can be set using :func:`dryrun`.
"""
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """
......@@ -51,20 +68,25 @@ class FSLNotPresent(Exception):
"""Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
be found.
"""
pass
@contextlib.contextmanager
def dryrun(*args):
def dryrun(*_):
"""Context manager which causes all calls to :func:`run` to be logged but
not executed. See the :data:`DRY_RUN` flag.
The returned standard output will be equal to ``' '.join(args)``.
After this function returns, each command that was executed while the
dryrun is active, along with any submission parameters, will be accessible
within a list which is stored as :data:`DRY_RUN_COMMANDS`.
"""
global DRY_RUN
global DRY_RUN, DRY_RUN_COMMANDS # pylint: disable=global-statement
oldval = DRY_RUN
DRY_RUN = True
DRY_RUN_COMMANDS = []
oldval = DRY_RUN
DRY_RUN = True
try:
yield
......@@ -80,8 +102,8 @@ def prepareArgs(args):
if len(args) == 1:
# Argument was a command string
if isinstance(args[0], six.string_types):
args = args[0].split()
if isinstance(args[0], str):
args = shlex.split(args[0])
# Argument was an unpacked sequence
else:
......@@ -90,7 +112,6 @@ def prepareArgs(args):
return list(args)
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
"""Creates and starts a daemon thread which forwards the given input stream
to one or more output streams. Used by the :func:`run` function to redirect
......@@ -145,68 +166,93 @@ def run(*args, **kwargs):
:arg submit: Must be passed as a keyword argument. Defaults to ``None``.
If ``True``, the command is submitted as a cluster job via
the :func:`.fslsub.submit` function. May also be a
the :mod:`fsl.wrappers.fsl_sub` function. May also be a
dictionary containing arguments to that function.
:arg log: Must be passed as a keyword argument. An optional ``dict``
which may be used to redirect the command's standard output
and error. The following keys are recognised:
:arg cmdonly: Defaults to ``False``. If ``True``, the command is not
executed, but rather is returned directly, as a list of
arguments.
:arg log: Must be passed as a keyword argument. Defaults to
``{'tee' : True}``. An optional ``dict`` which may be used
to redirect the command's standard output and error. Ignored
if ``submit`` is specified. The following keys are
recognised:
- tee: If ``True`` (the default), the command's
standard output/error streams are forwarded to
the output streams of this process, in addition
to being captured and returned.
- stdout: Optional callable or file-like object to which
the command's standard output stream can be
forwarded.
- tee: If ``True``, the command's standard output/error
streams are forwarded to this processes streams.
- stderr: Optional callable or file-like object to which
the command's standard error stream can be
forwarded.
- stdout: Optional file-like object to which the command's
standard output stream can be forwarded.
- cmd: Optional callable or file-like object to which
the command itself is logged.
- stderr: Optional file-like object to which the command's
standard error stream can be forwarded.
:arg silent: Suppress standard output/error. Equivalent to passing
``log={'tee' : False}``. Ignored if `log` is also passed.
- cmd: Optional file-like object to which the command
itself is logged.
All other keyword arguments are passed through to the ``subprocess.Popen``
object (via :func:`_realrun`), unless ``submit=True``, in which case they
are passed through to the :func:`.fsl_sub` function.
:returns: If ``submit`` is provided, the return value of
:func:`.fslsub` is returned. Otherwise returns a single
value or a tuple, based on the based on the ``stdout``,
``stderr``, and ``exitcode`` arguments.
:returns: If ``submit`` is provided, the ID of the submitted job is
returned as a string. Otherwise returns a single value or a
tuple, based on the based on the ``stdout``, ``stderr``, and
``exitcode`` arguments.
"""
returnStdout = kwargs.get('stdout', True)
returnStderr = kwargs.get('stderr', False)
returnExitcode = kwargs.get('exitcode', False)
submit = kwargs.get('submit', {})
log = kwargs.get('log', {})
tee = log .get('tee', False)
logStdout = log .get('stdout', None)
logStderr = log .get('stderr', None)
logCmd = log .get('cmd', None)
returnStdout = kwargs.pop('stdout', True)
returnStderr = kwargs.pop('stderr', False)
returnExitcode = kwargs.pop('exitcode', False)
submit = kwargs.pop('submit', {})
cmdonly = kwargs.pop('cmdonly', False)
logg = kwargs.pop('log', None)
silent = kwargs.pop('silent', False)
args = prepareArgs(args)
if logg is None:
logg = {'tee' : not silent}
tee = logg.get('tee', True)
logStdout = logg.get('stdout', None)
logStderr = logg.get('stderr', None)
logCmd = logg.get('cmd', None)
if not bool(submit):
submit = None
if submit is not None:
returnStdout = False
returnStderr = False
returnExitcode = False
if submit is True:
submit = dict()
if submit is not None and not isinstance(submit, collections.Mapping):
if submit is not None and not isinstance(submit, abc.Mapping):
raise ValueError('submit must be a mapping containing '
'options for fsl.utils.fslsub.submit')
if cmdonly:
return args
if DRY_RUN:
return _dryrun(
submit, returnStdout, returnStderr, returnExitcode, *args)
# submit - delegate to fslsub
# submit - delegate to fsl_sub. This will induce a nested
# call back to this run function, which is a bit confusing,
# but harmless, as we've popped the "submit" arg above.
if submit is not None:
return fslsub.submit(' '.join(args), **submit)
from fsl.wrappers import fsl_sub # pylint: disable=import-outside-toplevel # noqa: E501
return fsl_sub(*args, log=logg, **submit, **kwargs)[0].strip()
# Run directly - delegate to _realrun
stdout, stderr, exitcode = _realrun(
tee, logStdout, logStderr, logCmd, *args)
tee, logStdout, logStderr, logCmd, *args, **kwargs)
if not returnExitcode and (exitcode != 0):
raise RuntimeError('{} returned non-zero exit code: {}'.format(
......@@ -221,18 +267,23 @@ def run(*args, **kwargs):
else: return tuple(results)
def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
"""Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
active.
"""
# Save command/submit parameters -
# see the dryrun ctx manager
if DRY_RUN_COMMANDS is not None:
DRY_RUN_COMMANDS.append((args, submit))
if submit:
return ('0',)
results = []
stderr = ''
stdout = ' '.join(args)
join = getattr(shlex, 'join', ' '.join)
stdout = join(args)
if returnStdout: results.append(stdout)
if returnStderr: results.append(stderr)
......@@ -242,7 +293,7 @@ def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
else: return tuple(results)
def _realrun(tee, logStdout, logStderr, logCmd, *args):
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
"""Used by :func:`run`. Runs the given command and manages its standard
output and error streams.
......@@ -250,23 +301,31 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args):
streams are forwarded to this process' standard output/
error.
:arg logStdout: Optional file-like object to which the command's standard
output stream can be forwarded.
:arg logStdout: Optional callable or file-like object to which the
command's standard output stream can be forwarded.
:arg logStderr: Optional file-like object to which the command's standard
error stream can be forwarded.
:arg logStderr: Optional callable or file-like object to which the
command's standard error stream can be forwarded.
:arg logCmd: Optional file-like object to which the command itself is
logged.
:arg logCmd: Optional callable or file-like to which the command
itself is logged.
:arg args: Command to run
:arg kwargs: Passed through to the ``subprocess.Popen`` object.
:returns: A tuple containing:
- the command's standard output as a string.
- the command's standard error as a string.
- the command's exit code.
"""
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
if fslplatform.fslwsl:
# On Windows this prevents opening of a popup window
startupinfo = sp.STARTUPINFO()
startupinfo.dwFlags |= sp.STARTF_USESHOWWINDOW
kwargs["startupinfo"] = startupinfo
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
with tempdir.tempdir(changeto=False) as td:
# We always direct the command's stdout/
......@@ -287,14 +346,22 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args):
outstreams.append(sys.stdout)
errstreams.append(sys.stderr)
# And we also duplicate to caller-
# provided streams if they're given.
if logStdout is not None: outstreams.append(logStdout)
if logStderr is not None: errstreams.append(logStderr)
# log the command if requested
if logCmd is not None:
cmd = ' '.join(args) + '\n'
# And we also duplicate to caller-provided
# streams if they are file-likes (if they're
# callables, we call them after the process
# has completed)
if logStdout is not None and not callable(logStdout):
outstreams.append(logStdout)
if logStderr is not None and not callable(logStderr):
errstreams.append(logStderr)
# log the command if requested.
# logCmd can be a callable, or
# can be a file-like.
cmd = ' '.join(args) + '\n'
if callable(logCmd):
logCmd(cmd)
elif logCmd is not None:
if 'b' in getattr(logCmd, 'mode', 'w'):
logCmd.write(cmd.encode('utf-8'))
else:
......@@ -318,6 +385,10 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args):
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
# Send stdout/error to logStdout/err callables
if logStdout is not None and callable(logStdout): logStdout(stdout)
if logStderr is not None and callable(logStderr): logStderr(stderr)
return stdout, stderr, exitcode
......@@ -348,13 +419,290 @@ def runfsl(*args, **kwargs):
args = prepareArgs(args)
for prefix in prefixes:
cmdpath = op.join(prefix, args[0])
if op.isfile(cmdpath):
if fslplatform.fslwsl:
wslargs = wslcmd(cmdpath, *args)
if wslargs is not None:
args = wslargs
break
elif op.isfile(cmdpath):
args[0] = cmdpath
break
# error if the command cannot
# be found in a FSL directory
else:
raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
args[0], ', '.join(prefixes)))
return run(*args, **kwargs)
def wait(job_ids):
"""Proxy for :func:`.fslsub.wait`. """
return fslsub.wait(job_ids)
def runfunc(func,
args=None,
kwargs=None,
tmp_dir=None,
clean="never",
verbose=False,
**run_kwargs):
"""Run the given python function as a shell command. See
:func:`func_to_cmd` for details on the arguments.
The remaining ``run_kwargs`` arguments are passed through to the
:func:`run` function.
"""
cmd = func_to_cmd(func, args, kwargs, tmp_dir, clean, verbose)
return run(cmd, **run_kwargs)
def func_to_cmd(func,
args=None,
kwargs=None,
tmp_dir=None,
clean="never",
verbose=False):
"""Save the given python function to an executable file. Return a string
containing a command that can be used to run the function.
..warning:: If submitting a function defined in the ``__main__`` script,
the script will be run again to retrieve this function. Make
sure there is a ``if __name__ == '__main__'`` guard to prevent
the full script from being re-run.
:arg func: function to be run
:arg args: positional arguments
:arg kwargs: keyword arguments
:arg tmp_dir: directory where to store the temporary file (default: the
system temporary directory)
:arg clean: Whether the script should be removed after running. There are
three options:
- ``"never"``: (default) Script is kept
- ``"on_success"``: only remove if script successfully
finished (i.e., no error is raised)
- ``"always"``: always remove the script, even if it
raises an error
:arg verbose: If set to True, the script will print its own filename
before running
"""
script_template = tw.dedent("""
#!{executable}
# This is a temporary file designed to run the python function {funcname},
# so that it can be submitted to the cluster
import os
import dill
from io import BytesIO
from importlib import import_module
if {verbose}:
print('running {filename}')
dill_bytes = BytesIO({dill_bytes})
func, args, kwargs = dill.load(dill_bytes)
clean = {clean}
try:
res = func(*args, **kwargs)
except Exception as e:
if clean == 'on_success':
clean = 'never'
raise e
finally:
if clean in ('on_success', 'always'):
os.remove({filename})
""").strip()
if clean not in ('never', 'always', 'on_success'):
raise ValueError("Clean should be one of 'never', 'always', "
f"or 'on_success', not {clean}")
if args is None: args = ()
if kwargs is None: kwargs = {}
dill_bytes = io.BytesIO()
dill.dump((func, args, kwargs), dill_bytes, recurse=True)
handle, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
suffix='.py',
dir=tmp_dir)
os.close(handle)
python_cmd = script_template.format(
executable=sys.executable,
funcname=func.__name__,
filename=f'"{filename}"',
verbose=verbose,
clean=f'"{clean}"',
dill_bytes=dill_bytes.getvalue())
with open(filename, 'w') as f:
f.write(python_cmd)
os.chmod(filename, 0o755)
return f'{filename}'
def wslcmd(cmdpath, *args):
"""Convert a command + arguments into an equivalent set of arguments that
will run the command under Windows Subsystem for Linux
:param cmdpath: Fully qualified path to the command. This is essentially
a WSL path not a Windows one since FSLDIR is specified
as a WSL path, however it may have backslashes as path
separators due to previous use of ``os.path.join``
:param args: Sequence of command arguments (the first of which is the
unqualified command name)
:return: If ``cmdpath`` exists and is executable in WSL, return a
sequence of command arguments which when executed will run the
command in WSL. Windows paths in the argument list will be
converted to WSL paths. If ``cmdpath`` was not executable in
WSL, returns None
"""
# Check if command exists in WSL (remembering
# that the command path may include FSLDIR
# which is a Windows path)
cmdpath = fslpath.wslpath(cmdpath)
_stdout, _stderr, retcode = _realrun(
False, None, None, None, "wsl", "test", "-x", cmdpath)
if retcode == 0:
# Form a new argument list and convert
# any Windows paths in it into WSL paths
wslargs = [fslpath.wslpath(arg) for arg in args]
wslargs[0] = cmdpath
local_fsldir = fslpath.wslpath(fslplatform.fsldir)
if fslplatform.fsldevdir:
local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
else:
local_fsldevdir = None
# Prepend important environment variables -
# note that it seems we cannot use WSLENV
# for this due to its insistance on path
# mapping. FIXME FSLDEVDIR?
local_path = "$PATH"
if local_fsldevdir:
local_path += ":%s/bin" % local_fsldevdir
local_path += ":%s/bin" % local_fsldir
prepargs = [
"wsl",
"PATH=%s" % local_path,
"FSLDIR=%s" % local_fsldir,
"FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
]
if local_fsldevdir:
prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
return prepargs + wslargs
else:
# Command was not found in WSL with this path
return None
def hold(job_ids, hold_filename=None, timeout=10):
"""Waits until all specified cluster jobs have finished.
:arg job_ids: Possibly nested sequence of job ids. The job ids
themselves should be strings.
:arg hold_filename: Filename to use as a hold file. The containing
directory should exist, but the file itself should
not. Defaults to a ./.<random characters>.hold in
the current directory.
:arg timeout: Number of seconds to sleep between status checks.
"""
# Returns a potentially nested sequence of
# job ids as a single comma-separated string
def _flatten_job_ids(job_ids):
def unpack(job_ids):
if isinstance(job_ids, str):
return {job_ids}
elif isinstance(job_ids, int):
return {str(job_ids)}
else:
res = set()
for job_id in job_ids:
res.update(unpack(job_id))
return res
return ','.join(sorted(unpack(job_ids)))
if hold_filename is not None:
if op.exists(hold_filename):
raise IOError(f"Hold file ({hold_filename}) already exists")
elif not op.isdir(op.split(op.abspath(hold_filename))[0]):
raise IOError(f"Hold file ({hold_filename}) can not be created "
"in non-existent directory")
# Generate a random file name to use as
# the hold file. Reduce likelihood of
# naming collision by storing file in
# cwd.
if hold_filename is None:
handle, hold_filename = tempfile.mkstemp(prefix='.',
suffix='.hold',
dir='.')
os.remove(hold_filename)
os.close(handle)
submit = {
'jobhold' : _flatten_job_ids(job_ids),
'jobtime' : 1,
'name' : '.hold',
}
run(f'touch {hold_filename}', submit=submit, silent=True)
while not op.exists(hold_filename):
time.sleep(timeout)
# remove the hold file and the
# fsl_sub job stdout/err files
os.remove(hold_filename)
for outfile in glob.glob('.hold.[o,e]*'):
os.remove(outfile)
def job_output(job_id, logdir='.', command=None, name=None):
"""Returns the output of the given cluster-submitted job.
On SGE cluster systems, the standard output and error streams of a
submitted job are saved to files named ``<job_id>.o`` and ``<job_id>.e``.
This function simply reads those files and returns their content.
:arg job_id: String containing job ID.
:arg logdir: Directory containing the log - defaults to
the current directory.
:arg command: Command that was run. Not currently used.
:arg name: Job name if it was specified. Not currently used.
:returns: A tuple containing the standard output and standard error.
"""
stdout = list(glob.glob(op.join(logdir, f'*.o{job_id}')))
stderr = list(glob.glob(op.join(logdir, f'*.e{job_id}')))
if len(stdout) != 1 or len(stderr) != 1:
raise ValueError('No/too many error/output files for job '
f'{job_id}: stdout: {stdout}, stderr: {stderr}')
stdout = stdout[0]
stderr = stderr[0]
if op.exists(stdout):
with open(stdout, 'rt') as f:
stdout = f.read()
else:
stdout = None
if op.exists(stderr):
with open(stderr, 'rt') as f:
stderr = f.read()
else:
stderr = None
return stdout, stderr
......@@ -421,7 +421,7 @@ class Settings(object):
try:
with open(configFile, 'wb') as f:
pickle.dump(config, f, protocol=2)
except (IOError, pickle.PicklingError, EOFError):
except (IOError, pickle.PicklingError, EOFError, FileNotFoundError):
log.warning('Unable to save {} configuration file '
'{}'.format(self.__configID, configFile),
exc_info=True)
......@@ -21,7 +21,7 @@ import contextlib
@contextlib.contextmanager
def tempdir(root=None, changeto=True, override=None):
def tempdir(root=None, changeto=True, override=None, prefix=None):
"""Returns a context manager which creates and returns a temporary
directory, and then deletes it on exit.
......@@ -36,14 +36,21 @@ def tempdir(root=None, changeto=True, override=None):
:arg override: Don't create a temporary directory, but use this one
instead. This allows ``tempdir`` to be used as a context
manager when a temporary directory already exists.
:arg prefix: Create the temporary directory with a name starting with
this prefix.
"""
if root is not None:
root = os.path.abspath(root)
if override is None:
testdir = tempfile.mkdtemp(dir=root)
prevdir = os.getcwd()
testdir = tempfile.mkdtemp(dir=root, prefix=prefix)
else:
testdir = override
prevdir = os.getcwd()
try:
if changeto:
os.chdir(testdir)
......@@ -51,6 +58,6 @@ def tempdir(root=None, changeto=True, override=None):
finally:
if override is None:
if changeto:
os.chdir(prevdir)
shutil.rmtree(testdir)
if changeto:
os.chdir(prevdir)