Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • paulmc/fslpy
  • ndcn0236/fslpy
  • seanf/fslpy
3 results
Show changes
Showing
with 1831 additions and 676 deletions
...@@ -16,10 +16,9 @@ class Expired(Exception): ...@@ -16,10 +16,9 @@ class Expired(Exception):
"""``Exception`` raised by the :meth:`Cache.get` metho when an attempt is """``Exception`` raised by the :meth:`Cache.get` metho when an attempt is
made to access a cache item that has expired. made to access a cache item that has expired.
""" """
pass
class CacheItem(object): class CacheItem:
"""Internal container class used to store :class:`Cache` items. """ """Internal container class used to store :class:`Cache` items. """
def __init__(self, key, value, expiry=0): def __init__(self, key, value, expiry=0):
...@@ -29,7 +28,7 @@ class CacheItem(object): ...@@ -29,7 +28,7 @@ class CacheItem(object):
self.storetime = time.time() self.storetime = time.time()
class Cache(object): class Cache:
"""The ``Cache`` is a simple in-memory cache built on a """The ``Cache`` is a simple in-memory cache built on a
``collections.OrderedDict``. The ``Cache`` class has the following ``collections.OrderedDict``. The ``Cache`` class has the following
features: features:
...@@ -150,6 +149,23 @@ class Cache(object): ...@@ -150,6 +149,23 @@ class Cache(object):
return key in self.__cache return key in self.__cache
def keys(self):
"""Return all keys in the cache. """
return self.__cache.keys()
def values(self):
"""Return all values in the cache. """
for item in self.__cache.values():
yield item.value
def items(self):
"""Return all (key, value) pairs in the cache. """
for key, item in self.__cache.items():
yield key, item.value
def __parseDefault(self, *args, **kwargs): def __parseDefault(self, *args, **kwargs):
"""Used by the :meth:`get` method. Parses the ``default`` argument, """Used by the :meth:`get` method. Parses the ``default`` argument,
which may be specified as either a positional or keyword argumnet. which may be specified as either a positional or keyword argumnet.
......
...@@ -13,8 +13,7 @@ that some condition is met. ...@@ -13,8 +13,7 @@ that some condition is met.
ensureIsImage ensureIsImage
""" """
import pathlib
import six
import nibabel as nib import nibabel as nib
...@@ -24,7 +23,7 @@ import fsl.data.image as fslimage ...@@ -24,7 +23,7 @@ import fsl.data.image as fslimage
def ensureIsImage(img): def ensureIsImage(img):
"""Ensures that the given ``img`` is an in-memory ``nibabel`` object. """Ensures that the given ``img`` is an in-memory ``nibabel`` object.
""" """
if isinstance(img, six.string_types): if isinstance(img, (str, pathlib.Path)):
img = fslimage.addExt(img) img = fslimage.addExt(img)
img = nib.load(img) img = nib.load(img)
return img return img
""" """Easy format to define input/output files in a python pipeline.
Easy format to define input/output files in a python pipeline.
.. warning::
File-tree is now an independent Python library, and will eventually be
removed from ``fslpy`` - visit the `file-tree` :ref:`API documentation
<https://open.win.ox.ac.uk/pages/fsl/file-tree/>`_ for more details.
The goal is to separate the definition of the input/output filenames from the actual code The goal is to separate the definition of the input/output filenames from the actual code
by defining a directory tree (i.e., FileTree) in a separate file from the code. by defining a directory tree (i.e., FileTree) in a separate file from the code.
...@@ -177,7 +181,7 @@ which amongst others refers to ...@@ -177,7 +181,7 @@ which amongst others refers to
Example pipeline Example pipeline
---------------- ----------------
A very simple pipeline to run BET on every subject can start with a simply FileTree like A very simple pipeline to run BET on every subject can start with a FileTree like
:: ::
{subject} {subject}
...@@ -200,6 +204,12 @@ Assuming that the input T1w's already exist, we can then simply run BET for ever ...@@ -200,6 +204,12 @@ Assuming that the input T1w's already exist, we can then simply run BET for ever
# make_dir=True ensures that the output directory containing the "bet_output" actually exists # make_dir=True ensures that the output directory containing the "bet_output" actually exists
bet(input=T1w_tree.get('T1w'), output=T1w_tree.get('bet_output', make_dir=True), mask=True) bet(input=T1w_tree.get('T1w'), output=T1w_tree.get('bet_output', make_dir=True), mask=True)
Useful tips
-----------
Changing directory structure
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If later on in our input files change, because for some subjects we added a second session, we could keep our script If later on in our input files change, because for some subjects we added a second session, we could keep our script
and simply update the FileTree: and simply update the FileTree:
:: ::
...@@ -207,8 +217,8 @@ and simply update the FileTree: ...@@ -207,8 +217,8 @@ and simply update the FileTree:
{subject} {subject}
[ses-{session}] [ses-{session}]
T1w.nii.gz T1w.nii.gz
T1w_brain.nii.gz (bed_output) T1w_brain.nii.gz (bet_output)
T1w_brain_mask.nii.gz (bed_mask) T1w_brain_mask.nii.gz (bet_mask)
Note the square brackets around the session sub-directory. This indicates that this sub-directory is optional and Note the square brackets around the session sub-directory. This indicates that this sub-directory is optional and
will only be present if the "session" variable is defined (see `Optional variables`_). will only be present if the "session" variable is defined (see `Optional variables`_).
...@@ -230,17 +240,24 @@ altering this behaviour is again as simple as altering the FileTree to something ...@@ -230,17 +240,24 @@ altering this behaviour is again as simple as altering the FileTree to something
:: ::
raw_data raw_data
{subject} {subject} (input_subject_dir)
[ses-{session}] [ses-{session}] (input_session_dir)
T1w.nii.gz T1w.nii.gz
processed_data processed_data
{subject} {subject} (output_subject_dir)
[ses-{session}] [ses-{session}] (output_session_dir)
bet bet
{subject}[_{session}]_T1w_brain.nii.gz (bet_output) {subject}[_{session}]_T1w_brain.nii.gz (bet_output)
{subject}[_{session}]_T1w_brain_mask.nii.gz (bet_mask) {subject}[_{session}]_T1w_brain_mask.nii.gz (bet_mask)
Note that we also encoded the subject and session ID in the output filename. Note that we also encoded the subject and session ID in the output filename.
We also have to explicitly assign short names to the subject and session directories,
even though we don't explicitly reference these in the script.
The reason for this is that each directory and filename template must have a unique short name and
in this case the default short names (respectively, "{subject}" and "[ses-{session}]") would not have been unique.
Output "basenames"
^^^^^^^^^^^^^^^^^^
Some tools like FSL's FAST produce many output files. Rather than entering all Some tools like FSL's FAST produce many output files. Rather than entering all
of these files in our FileTree by hand you can include them all at once by including `Sub-trees`_: of these files in our FileTree by hand you can include them all at once by including `Sub-trees`_:
...@@ -248,12 +265,12 @@ of these files in our FileTree by hand you can include them all at once by inclu ...@@ -248,12 +265,12 @@ of these files in our FileTree by hand you can include them all at once by inclu
:: ::
raw_data raw_data
{subject} {subject} (input_subject_dir)
[ses-{session}] [ses-{session}] (input_session_dir)
T1w.nii.gz T1w.nii.gz
processed_data processed_data
{subject} {subject} (output_subject_dir)
[ses-{session}] [ses-{session}] (output_session_dir)
bet bet
{subject}[_{session}]_T1w_brain.nii.gz (bet_output) {subject}[_{session}]_T1w_brain.nii.gz (bet_output)
{subject}[_{session}]_T1w_brain_mask.nii.gz (bet_mask) {subject}[_{session}]_T1w_brain_mask.nii.gz (bet_mask)
...@@ -272,6 +289,35 @@ Within the script we can generate the fast output by running ...@@ -272,6 +289,35 @@ Within the script we can generate the fast output by running
The output files will be available as `T1w_tree.get('segment/<variable name>')`, where `<variable name>` is one The output files will be available as `T1w_tree.get('segment/<variable name>')`, where `<variable name>` is one
of the short variable names defined in the of the short variable names defined in the
`FAST FileTree <https://git.fmrib.ox.ac.uk/fsl/fslpy/blob/master/fsl/utils/filetree/trees/fast.tree>`_. `FAST FileTree <https://git.fmrib.ox.ac.uk/fsl/fslpy/blob/master/fsl/utils/filetree/trees/fast.tree>`_.
Running a pipeline on a subset of participants/sessions/runs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Suppose you want to run your pipeline on a subset of your data while testing.
You may want to do this if your data has a a hierarchy of variables (e.g. participant, session, run) as in the example below.
::
sub-001
ses-01
sub-001_ses-01_run-1.feat
sub-001_ses-01_run-2.feat
ses-02
sub-{participant}_ses-{session}_run-{run}.feat (feat_dir)
...
sub-002
sub-003
...
You can update the FileTree with one or more variables before calling `get_all_trees` as follows:
.. code-block:: python
for participant in ("001", "002"):
for t in tree.update(participant=participant, run="1").get_all_trees("feat_dir", glob_vars="all"):
my_pipeline(t)
This code will iterate over all sessions that have a run="1" for participants "001" and "002".
""" """
__author__ = 'Michiel Cottaar <Michiel.Cottaar@ndcn.ox.ac.uk>' __author__ = 'Michiel Cottaar <Michiel.Cottaar@ndcn.ox.ac.uk>'
...@@ -279,3 +325,13 @@ __author__ = 'Michiel Cottaar <Michiel.Cottaar@ndcn.ox.ac.uk>' ...@@ -279,3 +325,13 @@ __author__ = 'Michiel Cottaar <Michiel.Cottaar@ndcn.ox.ac.uk>'
from .filetree import FileTree, register_tree, MissingVariable from .filetree import FileTree, register_tree, MissingVariable
from .parse import tree_directories, list_all_trees from .parse import tree_directories, list_all_trees
from .query import FileTreeQuery from .query import FileTreeQuery
import fsl.utils.deprecated as deprecated
deprecated.warn('fsl.utils.filetree',
stacklevel=2,
vin='3.6.0',
rin='4.0.0',
msg='The filetree package is now released as a separate '
'Python library ("file-tree" on PyPi), and will be '
'removed in a future version of fslpy.')
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Tuple, Optional, Dict, Any, Set from typing import Tuple, Optional, Dict, Any, Set
from copy import deepcopy
from . import parse from . import parse
import pickle import pickle
import json
import os.path as op import os.path as op
from . import utils from . import utils
from fsl.utils.deprecated import deprecated
class MissingVariable(KeyError): class MissingVariable(KeyError):
...@@ -28,11 +27,11 @@ class FileTree(object): ...@@ -28,11 +27,11 @@ class FileTree(object):
- ``name``: descriptive name of the tree - ``name``: descriptive name of the tree
""" """
def __init__(self, def __init__(self,
templates: Dict[str, str], templates: Dict[str, str],
variables: Dict[str, Any], variables: Dict[str, Any],
sub_trees: Dict[str, "FileTree"] = None, sub_trees: Dict[str, "FileTree"] = None,
parent: Optional["FileTree"] = None, parent: Optional["FileTree"] = None,
name: str = None): name: str = None):
""" """
Creates a new filename tree. Creates a new filename tree.
""" """
...@@ -51,7 +50,6 @@ class FileTree(object): ...@@ -51,7 +50,6 @@ class FileTree(object):
""" """
return self._parent return self._parent
@property @property
def name(self, ): def name(self, ):
""" """
...@@ -59,7 +57,6 @@ class FileTree(object): ...@@ -59,7 +57,6 @@ class FileTree(object):
""" """
return self._name return self._name
@property @property
def all_variables(self, ): def all_variables(self, ):
""" """
...@@ -181,10 +178,10 @@ class FileTree(object): ...@@ -181,10 +178,10 @@ class FileTree(object):
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk. :param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk.
Any defined variables in `glob_vars` will be ignored. Any defined variables in `glob_vars` will be ignored.
If glob_vars is set to 'all', all undefined variables will be used to look up matches. If glob_vars is set to 'all', all undefined variables will be used to look up matches.
:return: sorted sequence of paths :return: sequence of paths
""" """
text, variables = self.get_template(short_name) return tuple([self.update(**vars).get(short_name)
return tuple(str(Path(fn)) for fn in utils.get_all(text, variables, glob_vars=glob_vars)) for vars in self.get_all_vars(short_name, glob_vars=glob_vars)])
def get_all_vars(self, short_name: str, glob_vars=()) -> Tuple[Dict[str, str]]: def get_all_vars(self, short_name: str, glob_vars=()) -> Tuple[Dict[str, str]]:
""" """
...@@ -196,7 +193,8 @@ class FileTree(object): ...@@ -196,7 +193,8 @@ class FileTree(object):
If glob_vars is set to 'all', all undefined variables will be used to look up matches. If glob_vars is set to 'all', all undefined variables will be used to look up matches.
:return: sequence of dictionaries with the variables settings used to generate each filename :return: sequence of dictionaries with the variables settings used to generate each filename
""" """
return tuple(self.extract_variables(short_name, fn) for fn in self.get_all(short_name, glob_vars=glob_vars)) text, variables = self.get_template(short_name)
return utils.get_all(text, variables, glob_vars=glob_vars)
def get_all_trees(self, short_name: str, glob_vars=(), set_parent=True) -> Tuple["FileTree"]: def get_all_trees(self, short_name: str, glob_vars=(), set_parent=True) -> Tuple["FileTree"]:
""" """
...@@ -225,7 +223,7 @@ class FileTree(object): ...@@ -225,7 +223,7 @@ class FileTree(object):
Setting a variable to None will cause the variable to be unset Setting a variable to None will cause the variable to be unset
:return: New FileTree with same templates for directory names and filenames, but updated variables :return: New FileTree with same templates for directory names and filenames, but updated variables
""" """
new_tree = deepcopy(self) new_tree = self.copy()
set_tree = new_tree set_tree = new_tree
while set_parent and set_tree.parent is not None: while set_parent and set_tree.parent is not None:
set_tree = set_tree.parent set_tree = set_tree.parent
...@@ -256,12 +254,28 @@ class FileTree(object): ...@@ -256,12 +254,28 @@ class FileTree(object):
with open(filename, 'wb') as f: with open(filename, 'wb') as f:
pickle.dump(self, f) pickle.dump(self, f)
def save_json(self, filename):
"""
Saves the Filetree to a JSON file
:param filename: filename to store the file tree in
"""
def default(obj):
if isinstance(obj, FileTree):
res = dict(obj.__dict__)
del res['_parent']
return res
return obj
with open(filename, 'w') as f:
json.dump(self, f, default=default, indent=2)
@classmethod @classmethod
def load_pickle(cls, filename): def load_pickle(cls, filename):
""" """
Loads the Filetree from a pickle file Loads the Filetree from a pickle file
:param filename: filename produced from Filetree.save :param filename: filename produced from Filetree.save_pickle
:return: stored Filetree :return: stored Filetree
""" """
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
...@@ -270,6 +284,29 @@ class FileTree(object): ...@@ -270,6 +284,29 @@ class FileTree(object):
raise IOError("Pickle file did not contain %s object" % cls) raise IOError("Pickle file did not contain %s object" % cls)
return res return res
@classmethod
def load_json(cls, filename):
"""
Loads the FileTree from a JSON file
:param filename: filename produced by FileTree.save_json
:return: stored FileTree
"""
def from_dict(input_dict):
res_tree = FileTree(
templates=input_dict['templates'],
variables=input_dict['variables'],
sub_trees={name: from_dict(value) for name, value in input_dict['sub_trees'].items()},
name=input_dict['_name'],
)
for sub_tree in res_tree.sub_trees.values():
sub_tree._parent = res_tree
return res_tree
with open(filename, 'r') as f:
as_dict = json.load(f)
return from_dict(as_dict)
def defines(self, short_names, error=False): def defines(self, short_names, error=False):
""" """
Checks whether templates are defined for all the `short_names` Checks whether templates are defined for all the `short_names`
...@@ -327,8 +364,68 @@ class FileTree(object): ...@@ -327,8 +364,68 @@ class FileTree(object):
return False return False
return True return True
def partial_fill(self, ) -> "FileTree":
"""
Fills in known variables into the templates
:return: The resulting tree will have empty `variables` dictionaries and updated templates
"""
new_tree = self.copy()
to_update = new_tree
while to_update.parent is not None:
to_update = to_update.parent
to_update._update_partial_fill()
return new_tree
def _update_partial_fill(self, ):
"""
Helper function for `partial_fill` that updates the templates in place
"""
new_templates = {}
for short_name in self.templates:
template, variables = self.get_template(short_name)
new_templates[short_name] = str(utils.Template.parse(template).fill_known(variables))
self.templates = new_templates
for tree in self.sub_trees.values():
tree._update_partial_fill()
self.variables = {}
def copy(self, ):
"""
Copies the FileTree
Copies the templates, variables, sub_trees, and parent
:return: a copy of the FileTree
"""
return self._copy()
def _copy(self, new_parent=None, new_sub_tree=None):
"""
Helper function for copying a FileTree
"""
if new_sub_tree is None:
new_sub_tree = (None, None)
new_copy = type(self)(
templates=self.templates.copy(),
variables=self.variables.copy(),
name=self.name,
parent=new_parent
)
new_copy.sub_trees = {name: new_sub_tree[1] if new_sub_tree[0] == name else tree._copy(new_parent=new_copy)
for name, tree in self.sub_trees.items()}
if self.parent is not None and new_parent is None:
for my_key, ref_tree in self.parent.sub_trees.items():
if self is ref_tree:
break
else:
raise ValueError(f"Sub-tree {self} not found in parent tree")
new_copy._parent = self.parent._copy(new_sub_tree=(my_key, new_copy))
return new_copy
@classmethod @classmethod
def read(cls, tree_name: str, directory='.', **variables) -> "FileTree": def read(cls, tree_name: str, directory='.', partial_fill=False, **variables) -> "FileTree":
""" """
Reads a FileTree from a specific file Reads a FileTree from a specific file
...@@ -338,6 +435,7 @@ class FileTree(object): ...@@ -338,6 +435,7 @@ class FileTree(object):
:param tree_name: file containing the filename tree. :param tree_name: file containing the filename tree.
Can provide the filename of a tree file or the name for a tree in the ``filetree.tree_directories``. Can provide the filename of a tree file or the name for a tree in the ``filetree.tree_directories``.
:param directory: parent directory of the full tree (defaults to current directory) :param directory: parent directory of the full tree (defaults to current directory)
:param partial_fill: By default any known `variables` are filled into the `template` immediately
:param variables: variable settings :param variables: variable settings
:return: dictionary from specifier to filename :return: dictionary from specifier to filename
""" """
...@@ -421,6 +519,8 @@ class FileTree(object): ...@@ -421,6 +519,8 @@ class FileTree(object):
res = get_registered(tree_name, cls)(templates, variables=file_variables, sub_trees=sub_trees, name=tree_name) res = get_registered(tree_name, cls)(templates, variables=file_variables, sub_trees=sub_trees, name=tree_name)
for tree in sub_trees.values(): for tree in sub_trees.values():
tree._parent = res tree._parent = res
if partial_fill:
res = res.partial_fill()
return res return res
......
...@@ -31,8 +31,7 @@ from typing import Dict, List, Tuple ...@@ -31,8 +31,7 @@ from typing import Dict, List, Tuple
import numpy as np import numpy as np
from fsl.utils.deprecated import deprecated from . import FileTree
from . import FileTree
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -89,6 +88,8 @@ class FileTreeQuery(object): ...@@ -89,6 +88,8 @@ class FileTreeQuery(object):
:arg tree: The :class:`.FileTree` object :arg tree: The :class:`.FileTree` object
""" """
# Hard-code into the templates any pre-defined variables
tree = tree.partial_fill()
# Find all files present in the directory # Find all files present in the directory
# (as Match objects), and find all variables, # (as Match objects), and find all variables,
...@@ -130,7 +131,7 @@ class FileTreeQuery(object): ...@@ -130,7 +131,7 @@ class FileTreeQuery(object):
# An ND array for this short # An ND array for this short
# name. Each element is a # name. Each element is a
# Match object, or nan. # Match object, or nan.
matcharray = np.zeros(tvarlens, dtype=np.object) matcharray = np.zeros(tvarlens, dtype=object)
matcharray[:] = np.nan matcharray[:] = np.nan
# indices into the match array # indices into the match array
...@@ -205,15 +206,6 @@ class FileTreeQuery(object): ...@@ -205,15 +206,6 @@ class FileTreeQuery(object):
return list(self.__templatevars.keys()) return list(self.__templatevars.keys())
@property
@deprecated('2.6.0', '3.0.0', 'Use templates instead')
def short_names(self) -> List[str]:
"""Returns a list containing all templates of the ``FileTree`` that
are present in the directory.
"""
return self.templates
def query(self, template, asarray=False, **variables): def query(self, template, asarray=False, **variables):
"""Search for files of the given ``template``, which match """Search for files of the given ``template``, which match
the specified ``variables``. All hits are returned for variables the specified ``variables``. All hits are returned for variables
...@@ -290,12 +282,6 @@ class Match(object): ...@@ -290,12 +282,6 @@ class Match(object):
return self.__filename return self.__filename
@property
@deprecated('2.6.0', '3.0.0', 'Use template instead')
def short_name(self):
return self.template
@property @property
def template(self): def template(self):
return self.__template return self.__template
...@@ -369,13 +355,13 @@ def scan(tree : FileTree) -> List[Match]: ...@@ -369,13 +355,13 @@ def scan(tree : FileTree) -> List[Match]:
matches = [] matches = []
for template in tree.templates: for template in tree.templates:
for filename in tree.get_all(template, glob_vars='all'): for variables in tree.get_all_vars(template, glob_vars='all'):
filename = tree.update(**variables).get(template)
if not op.isfile(filename): if not op.isfile(filename):
continue continue
variables = dict(tree.extract_variables(template, filename))
matches.append(Match(filename, template, tree, variables)) matches.append(Match(filename, template, tree, variables))
for tree_name, sub_tree in tree.sub_trees.items(): for tree_name, sub_tree in tree.sub_trees.items():
......
...@@ -2,19 +2,22 @@ ext=.nii.gz ...@@ -2,19 +2,22 @@ ext=.nii.gz
dataset_description.json dataset_description.json
participants.tsv participants.tsv
README README (readme)
CHANGES CHANGES (changes)
LICENSE (license)
genetic_info.json
sub-{participant} sub-{participant}
[ses-{session}] [ses-{session}]
sub-{participant}_sessions.tsv (sessions_tsv) sub-{participant}_sessions.tsv (sessions_tsv)
anat (anat_dir) anat (anat_dir)
sub-{participant}[_ses-{session}][_acq-{acq}][_rec-{rec}][_run-{run_index}]_{modality}{ext} (anat_image) sub-{participant}[_ses-{session}][_acq-{acq}][_ce-{ce}][_rec-{rec}][_run-{run_index}]_{modality}{ext} (anat_image)
sub-{participant}[_ses-{session}][_acq-{acq}][_ce-{ce}][_rec-{rec}][_run-{run_index}][_mod-{modality}]_defacemask{ext} (anat_deface)
func (func_dir) func (func_dir)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}]_bold{ext} (task_image) sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}]_bold{ext} (task_image)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}]_sbref{ext} (task_sbref) sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}]_sbref{ext} (task_sbref)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}]_events.tsv (task_events) sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}]_events.tsv (task_events)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}][_recording-{recording}]_physio{ext} (task_physio) sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}][_recording-{recording}]_physio.tsv.gz (task_physio)
sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_rec-{rec}][_run-{run_index}][_recording-{recording}]_stim{ext} (task_stim) sub-{participant}[_ses-{session}]_task-{task}[_acq-{acq}][_ce-{ce}][_dir-{dir}][_rec-{rec}][_run-{run_index}][_echo-{echo}][_recording-{recording}]_stim.tsv.gz (task_stim)
dwi (dwi_dir) dwi (dwi_dir)
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_dwi{ext} (dwi_image) sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_dwi{ext} (dwi_image)
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_dwi.bval (bval) sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_dwi.bval (bval)
...@@ -28,3 +31,15 @@ sub-{participant} ...@@ -28,3 +31,15 @@ sub-{participant}
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_phase2{ext} (fmap_phase2) sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_phase2{ext} (fmap_phase2)
sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_fieldmap{ext} (fmap) sub-{participant}[_ses-{session}][_acq-{acq}][_run-{run_index}]_fieldmap{ext} (fmap)
sub-{participant}[_ses-{session}][_acq-{acq}]_dir-{dir}[_run-{run_index}]_epi{ext} (fmap_epi) sub-{participant}[_ses-{session}][_acq-{acq}]_dir-{dir}[_run-{run_index}]_epi{ext} (fmap_epi)
meg (meg_dir)
sub-{participant}[_ses-{session}]_task-{task}[_run-{run}][_proc-{proc}]_meg.{meg_ext} (meg)
eeg (eeg_dir)
sub-{participant}[_ses-{session}]_task-{task}[_run-{run}][_proc-{proc}]_eeg.{eeg_ext} (eeg)
ieeg (ieeg_dir)
sub-{participant}[_ses-{session}]_task-{task}[_run-{run}][_proc-{proc}]_ieeg.{ieeg_ext} (ieeg)
beh (behavioral_dir)
sub-{participant}[_ses-{session}]_task-{task}_events.tsv (behavioural_events)
sub-{participant}[_ses-{session}]_task-{task}_beh.tsv (behavioural)
sub-{participant}[_ses-{session}]_task-{task}_physio.tsv.gz (behavioural_physio)
sub-{participant}[_ses-{session}]_task-{task}_stim.tsv.gz (behavioral_stim)
...@@ -12,3 +12,10 @@ basename = dti ...@@ -12,3 +12,10 @@ basename = dti
{basename}_L3.nii.gz (L3) {basename}_L3.nii.gz (L3)
{basename}_kurt.nii.gz (kurt) {basename}_kurt.nii.gz (kurt)
{basename}_kurt1.nii.gz (kurt1)
{basename}_kurt2.nii.gz (kurt2)
{basename}_kurt3.nii.gz (kurt3)
{basename}_sse.nii.gz (sse)
{basename}_cnicope.nii.gz (cnicope)
{basename}_tensor.nii.gz (tensor)
struct = T1
basename = fsl_anat
{basename} (basename)
{basename}.anat (fsl_anat_dir)
lesionmaskinv.nii.gz
lesionmask.nii.gz
log.txt
MNI152_{struct}_2mm_brain_mask_dil1.nii.gz
MNI_to_{struct}_nonlin_field.nii.gz
{struct}2std_skullcon.mat
{struct}_biascorr_bet_skull.nii.gz (biascorr_bet_skull)
{struct}_biascorr_brain_mask.nii.gz (biascorr_brain_mask)
{struct}_biascorr_brain.nii.gz (biascorr_brain)
{struct}_biascorr.nii.gz (biascorr)
{struct}_fast_bias_idxmask.nii.gz
{struct}_fast_bias_init.nii.gz
{struct}_fast_bias.nii.gz
{struct}_fast_bias_vol2.nii.gz
{struct}_fast_bias_vol32.nii.gz
{struct}_fast_restore.nii.gz
{struct}_fast_seg.nii.gz (fast_seg)
{struct}_fast_pve_0.nii.gz
{struct}_fast_pve_1.nii.gz
{struct}_fast_pve_2.nii.gz
{struct}_fast_pveseg.nii.gz (fast_pveseg)
{struct}_fast_totbias.nii.gz
{struct}_fullfov.nii.gz
{struct}_initfast2_brain_mask2.nii.gz
{struct}_initfast2_brain_mask.nii.gz
{struct}_initfast2_brain.nii.gz
{struct}_initfast2_maskedrestore.nii.gz
{struct}_initfast2_restore.nii.gz
{struct}.nii.gz
{struct}_nonroi2roi.mat
{struct}_orig2roi.mat
{struct}_orig2std.mat
{struct}_orig.nii.gz
{struct}_roi2nonroi.mat
{struct}_roi2orig.mat
{struct}_roi.log
{struct}_std2orig.mat
{struct}_to_MNI_lin.mat (MNI_lin_mat)
{struct}_to_MNI_lin.nii.gz (MNI_lin_nii)
{struct}_to_MNI_nonlin_coeff.nii.gz
{struct}_to_MNI_nonlin_field.nii.gz
{struct}_to_MNI_nonlin_jac.nii.gz
{struct}_to_MNI_nonlin.nii.gz (MNI_nonlin_nii)
{struct}_to_MNI_nonlin.txt (MNI_nonlin_txt)
import re import re
import itertools import itertools
import glob import glob
from . import filetree from typing import List, Sequence, Set, Tuple, Dict, Iterator
def resolve(template, variables): class Part:
""" """
Resolves the template given a set of variables Individual part of a template
:param template: template 3 subclasses are defined:
:param variables: mapping of variable names to values
:return: cleaned string - :class:`Literal`: piece of text
- :class:`Required`: required variable to fill in (between curly brackets)
- :class:`Optional`: part of text containing optional variables (between square brackets)
""" """
filled = fill_known(template, variables) def fill_known(self, variables) -> Sequence["Part"]:
filename = resolve_optionals(filled) """
remaining = find_variables(filename) Fills in the given variables
if len(remaining) > 0: """
raise filetree.MissingVariable('Variables %s not defined' % set(remaining)) return [self]
return filename
def optional_variables(self, ) -> Set["Part"]:
"""
Returns all variables in optional parts
"""
return set()
def get_all(template, variables, glob_vars=()): def required_variables(self, ) -> Set["Part"]:
""" """
Gets all variables matching the templates given the variables Returns all required variables
"""
return set()
:param template: template def contains_optionals(self, variables: Set["Part"]=None):
:param variables: (incomplete) mapping of variable names to values """
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk Returns True if this part contains the optional variables
If `glob_vars` contains any defined variables, it will be ignored. """
:return: sequence of filenames return False
def append_variables(self, variables: List[str]):
"""
Appends the variables in this part to the provided list in order
"""
pass
class Literal(Part):
def __init__(self, text: str):
"""
Literal part is defined purely by the text it contains
:param text: part of the template
"""
self.text = text
def __str__(self):
"""
Returns this part of the template as a string
"""
return self.text
class Required(Part):
def __init__(self, var_name, var_formatting=None):
"""
Required part of template (between curly brackets)
Required variable part of template is defined by variable name and its format
:param var_name: name of variable
:param var_formatting: how to format the variable
"""
self.var_name = var_name
self.var_formatting = var_formatting
def __str__(self):
"""
Returns this part of the template as a string
"""
if self.var_formatting is None:
return '{' + self.var_name + '}'
else:
return '{' + self.var_name + ':' + self.var_formatting + '}'
def fill_known(self, variables):
if self.var_name in variables:
return Template.parse(str(self).format(**variables)).parts
return [self]
def required_variables(self, ):
return {self.var_name}
def append_variables(self, variables):
variables.append(self.var_name)
class Optional(Part):
def __init__(self, sub_template: "Template"):
"""
Optional part of template (between square brackets)
Optional part can contain literal and required parts
:param sub_template: part of the template within square brackets
"""
self.sub_template = sub_template
def __str__(self):
return '[' + str(self.sub_template) + ']'
def fill_known(self, variables):
new_opt = self.sub_template.fill_known(variables)
if len(new_opt.required_variables()) == 0:
return Template.parse(str(new_opt)).parts
return [Optional(new_opt)]
def optional_variables(self, ):
return self.sub_template.required_variables()
def contains_optionals(self, variables=None):
if variables is None and len(self.optional_variables()) > 0:
return True
return len(self.optional_variables().intersection(variables)) > 0
def append_variables(self, variables):
variables.extend(self.sub_template.ordered_variables())
class Template:
"""
Splits a template into its constituent parts
""" """
filled = fill_known(template, variables) def __init__(self, parts: Sequence[Part]):
remaining = set(find_variables(filled)) if isinstance(parts, str):
optional = optional_variables(filled) raise ValueError("Input to Template should be a sequence of parts; " +
res = set() "did you mean to call `Template.parse` instead?")
if glob_vars == 'all': self.parts = tuple(parts)
glob_vars = remaining
glob_vars = set(glob_vars).difference(variables.keys()) @classmethod
def parse(cls, text: str) -> "Template":
undefined_vars = remaining.difference(glob_vars).difference(optional) """
if len(undefined_vars) > 0: Parses a text template into its constituent parts
raise KeyError("Required variables {} were not defined".format(undefined_vars))
:param text: input template as string
for keep in itertools.product(*[(True, False) for _ in optional.intersection(glob_vars)]): :return: same template split into its parts
sub_variables = {var: '*' for k, var in zip(keep, optional) if k} """
for var in remaining.difference(optional).intersection(glob_vars): parts = []
sub_variables[var] = '*' for optional_parts in re.split(r'(\[.*?\])', text):
sub_filled = fill_known(filled, sub_variables) if len(optional_parts) > 0 and optional_parts[0] == '[' and optional_parts[-1] == ']':
if '[' in optional_parts[1:-1] or ']' in optional_parts[1:-1]:
pattern = resolve_optionals(sub_filled) raise ValueError(f'Can not parse {text}, because unmatching square brackets were found')
assert len(find_variables(pattern)) == 0 parts.append(Optional(Template.parse(optional_parts[1:-1])))
else:
for filename in glob.glob(pattern): for required_parts in re.split(r'(\{.*?\})', optional_parts):
try: if len(required_parts) > 0 and required_parts[0] == '{' and required_parts[-1] == '}':
extract_variables(filled, filename) if ':' in required_parts:
except ValueError: var_name, var_type = required_parts[1:-1].split(':')
else:
var_name, var_type = required_parts[1:-1], ''
parts.append(Required(var_name, var_type))
else:
parts.append(Literal(required_parts))
return Template(parts)
def __str__(self):
"""
Returns the template as a string
"""
return ''.join([str(p) for p in self.parts])
def optional_variables(self, ) -> Set[str]:
"""
Set of optional variables
"""
if len(self.parts) == 0:
return set()
optionals = set.union(*[p.optional_variables() for p in self.parts])
return optionals.difference(self.required_variables())
def required_variables(self, ) -> Set[str]:
"""
Set of required variables
"""
if len(self.parts) == 0:
return set()
return set.union(*[p.required_variables() for p in self.parts])
def ordered_variables(self, ) -> Tuple[str]:
"""
Sequence of all variables in order (can contain duplicates)
"""
ordered_vars = []
for p in self.parts:
p.append_variables(ordered_vars)
return ordered_vars
def fill_known(self, variables) -> "Template":
"""
Fill in the known variables
Any optional parts, where all variables have been filled will be automatically replaced
"""
prev = ''
while str(self) != prev:
prev = str(self)
self = self._fill_known_single(variables)
return self
def _fill_known_single(self, variables):
"""
Helper method for :meth:`_fill_known`
"""
res = []
for p in self.parts:
res.extend(p.fill_known(variables))
return Template(res)
def remove_optionals(self, optionals=None) -> "Template":
"""
Removes any optionals containing the provided variables (default: remove all)
"""
return Template([p for p in self.parts if not p.contains_optionals(optionals)])
def resolve(self, variables) -> str:
"""
Resolves the template given a set of variables
:param variables: mapping of variable names to values
:return: cleaned string
"""
clean_template = self.fill_known(variables).remove_optionals()
if len(clean_template.required_variables()) > 0:
raise KeyError("Variables %s not defined" % clean_template.required_variables())
return str(clean_template)
def get_all(self, variables, glob_vars=()) -> Tuple[Dict[str, str]]:
"""
Gets all variables for files on disk matching the templates
:param variables: (incomplete) mapping of variable names to values
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk
"""
filled = self.fill_known(variables)
if glob_vars == 'all':
glob_vars = set.union(self.required_variables(), self.optional_variables())
if len(filled.required_variables().difference(glob_vars)) > 0:
raise KeyError("Required variables {} were not defined".format(
filled.required_variables().difference(glob_vars)
))
cleaned = filled.remove_optionals(filled.optional_variables().difference(glob_vars))
return cleaned._get_all_helper(glob_vars)
def _get_all_helper(self, glob_vars):
params = set()
optionals = self.optional_variables()
for to_fill in self.optional_subsets():
pattern = str(to_fill.fill_known({var: '*' for var in glob_vars}))
while '//' in pattern:
pattern = pattern.replace('//', '/')
for filename in sorted(glob.glob(pattern)):
try:
extracted_vars = to_fill.extract_variables(filename)
for name in optionals:
if name not in extracted_vars:
extracted_vars[name] = None
params.add(tuple(sorted(extracted_vars.items(), key=lambda item: item[0])))
except ValueError:
pass
return tuple([dict(p) for p in params])
def optional_subsets(self, ) -> Iterator["Template"]:
"""
Yields template sub-sets with every combination optional variables
"""
optionals = self.optional_variables()
for n_optional in range(len(optionals) + 1):
for exclude_optional in itertools.combinations(optionals, n_optional):
yield self.remove_optionals(exclude_optional)
def extract_variables(self, filename, known_vars=None):
"""
Extracts the variable values from the filename
:param filename: filename
:param known_vars: already known variables
:return: dictionary from variable names to string representations (unused variables set to None)
"""
if known_vars is not None:
template = self.fill_known(known_vars)
else:
template = self
while '//' in filename:
filename = filename.replace('//', '/')
required = template.required_variables()
optional = template.optional_variables()
results = []
for to_fill in template.optional_subsets():
sub_re = str(to_fill.fill_known(
{var: r'(\S+)' for var in required.union(optional)},
))
while '//' in sub_re:
sub_re = sub_re.replace('//', '/')
sub_re = sub_re.replace('.', r'\.')
match = re.match(sub_re, filename)
if match is None:
continue continue
res.add(filename)
return sorted(res)
extracted_value = {}
ordered_vars = to_fill.ordered_variables()
assert len(ordered_vars) == len(match.groups())
failed = False
for var, value in zip(ordered_vars, match.groups()):
if var in extracted_value:
if value != extracted_value[var]:
failed = True
break
else:
extracted_value[var] = value
if failed or any('/' in value for value in extracted_value.values()):
continue
for name in template.optional_variables():
if name not in extracted_value:
extracted_value[name] = None
if known_vars is not None:
extracted_value.update(known_vars)
results.append(extracted_value)
if len(results) == 0:
raise ValueError("{} did not match {}".format(filename, template))
def fill_known(template, variables): def score(variables):
"""
The highest score is given to the set of variables that:
1. has used the largest amount of optional variables
2. has the shortest text within the variables (only used if equal at 1
"""
number_used = len([v for v in variables.values() if v is not None])
length_hint = sum([len(v) for v in variables.values() if v is not None])
return number_used * 1000 - length_hint
best = max(results, key=score)
for var in results:
if best != var and score(best) == score(var):
raise KeyError("Multiple equivalent ways found to parse {} using {}".format(filename, template))
return best
def resolve(template, variables):
""" """
Fills in the known variables filling the other variables with {<variable_name>} Resolves the template given a set of variables
:param template: template :param template: template
:param variables: mapping of variable names to values (ignoring any None) :param variables: mapping of variable names to values
:return: cleaned string :return: cleaned string
""" """
prev = '' return Template.parse(template).resolve(variables)
while prev != template:
prev = template
settings = {}
for name in set(find_variables(template)):
if name in variables and variables[name] is not None:
settings[name] = variables[name]
else:
settings[name] = '{' + name + '}'
template = template.format(**settings)
return template
def resolve_optionals(text): def get_all(template, variables, glob_vars=()):
""" """
Resolves the optional sections Gets all variables matching the templates given the variables
:param text: template after filling in the known variables :param template: template
:return: cleaned string :param variables: (incomplete) mapping of variable names to values
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk
If `glob_vars` contains any defined variables, it will be ignored.
:return: sequence of variables
""" """
def resolve_single_optional(part): return Template.parse(template).get_all(variables, glob_vars)
if len(part) == 0:
return part
if part[0] != '[' or part[-1] != ']':
return part
elif len(find_variables(part)) == 0:
return part[1:-1]
else:
return ''
res = [resolve_single_optional(text) for text in re.split(r'(\[.*?\])', text)]
return ''.join(res)
def find_variables(template): def find_variables(template):
...@@ -109,7 +380,7 @@ def find_variables(template): ...@@ -109,7 +380,7 @@ def find_variables(template):
:param template: full template :param template: full template
:return: sequence of variables :return: sequence of variables
""" """
return tuple(var.split(':')[0] for var in re.findall(r"\{(.*?)\}", template)) return Template.parse(template).ordered_variables()
def optional_variables(template): def optional_variables(template):
...@@ -119,17 +390,7 @@ def optional_variables(template): ...@@ -119,17 +390,7 @@ def optional_variables(template):
:param template: full template :param template: full template
:return: set of variables that are only present in optional parts of the string :return: set of variables that are only present in optional parts of the string
""" """
include = set() return Template.parse(template).optional_variables()
exclude = set()
for text in re.split(r'(\[.*?\])', template):
if len(text) == 0:
continue
variables = find_variables(text)
if text[0] == '[' and text[-1] == ']':
include.update(variables)
else:
exclude.update(variables)
return include.difference(exclude)
def extract_variables(template, filename, known_vars=None): def extract_variables(template, filename, known_vars=None):
...@@ -141,41 +402,4 @@ def extract_variables(template, filename, known_vars=None): ...@@ -141,41 +402,4 @@ def extract_variables(template, filename, known_vars=None):
:param known_vars: already known variables :param known_vars: already known variables
:return: dictionary from variable names to string representations (unused variables set to None) :return: dictionary from variable names to string representations (unused variables set to None)
""" """
if known_vars is None: return Template.parse(template).extract_variables(filename, known_vars)
known_vars = {}
template = fill_known(template, known_vars)
while '//' in filename:
filename = filename.replace('//', '/')
remaining = set(find_variables(template))
optional = optional_variables(template)
for keep in itertools.product(*[(True, False) for _ in optional]):
sub_re = resolve_optionals(fill_known(
template,
dict(
**{var: r'(\S+)' for k, var in zip(keep, optional) if k},
**{var: r'(\S+)' for var in remaining.difference(optional)}
)
))
while '//' in sub_re:
sub_re = sub_re.replace('//', '/')
sub_re = sub_re.replace('.', r'\.')
if re.match(sub_re, filename) is None:
continue
extracted_value = {}
kept_vars = [var for var in find_variables(template)
if var not in optional or keep[list(optional).index(var)]]
for var, value in zip(kept_vars, re.match(sub_re, filename).groups()):
if var in extracted_value:
if value != extracted_value[var]:
raise ValueError('Multiple values found for {}'.format(var))
else:
extracted_value[var] = value
if any('/' in value for value in extracted_value.values()):
continue
for name in find_variables(template):
if name not in extracted_value:
extracted_value[name] = None
extracted_value.update(known_vars)
return extracted_value
raise ValueError("{} did not match {}".format(filename, template))
...@@ -7,9 +7,15 @@ ...@@ -7,9 +7,15 @@
"""This module submits jobs to a computing cluster using FSL's fsl_sub command """This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE. line tool. It is assumed that the computing cluster is managed by SGE.
.. note:: All of the functionality in this module is deprecated and will be
removed in a future version of fslpy. Equivalent functionality is
available in the `fsl_sub <https://git.fmrib.ox.ac.uk/fsl/fsl_sub>`_
project, and the :mod:`fsl.utils.run` module and
:mod:`.wrappers.fsl_sub` wrapper function.
Example usage, building a short pipeline:: Example usage, building a short pipeline::
from fsl.utils.fslsub import submit, wait from fsl.utils.fslsub import submit
# submits bet to veryshort queue unless <mask_filename> already exists # submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m', bet_job = submit('bet <input_filename> -m',
...@@ -26,53 +32,249 @@ Example usage, building a short pipeline:: ...@@ -26,53 +32,249 @@ Example usage, building a short pipeline::
wait_for=(bet_job, other_job), wait_for=(bet_job, other_job),
queue='cuda.q') queue='cuda.q')
# waits for the cuda job to finish
wait(cuda_job)
.. autosummary:: .. autosummary::
:nosignatures: :nosignatures:
submit submit
info info
output output
wait
func_to_cmd func_to_cmd
hold
""" """
from six import string_types, BytesIO from io import BytesIO
import subprocess as sp
import os.path as op import os.path as op
import glob import glob
import time import time
import pickle import dill
import sys import sys
import tempfile import tempfile
import logging import logging
import importlib import importlib
from dataclasses import dataclass, asdict
from typing import Optional, Collection, Union, Tuple, Dict
import argparse
import warnings
import os
import fsl.utils.deprecated as deprecated
import fsl.utils.run as run
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def submit(*command, @dataclass
minutes=None, class SubmitParams:
queue=None, """Represents the fsl_sub parameters
architecture=None,
priority=None, The ``SubmitParams`` class is deprecated - you should use
email=None, :mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python
wait_for=None, library, which is installed as part of FSL.
job_name=None,
ram=None, Any command line script can be submitted by the parameters by calling the `SubmitParams` object:
logdir=None,
mail_options=None, .. code-block:: python
output=None,
flags=False, submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019'])
multi_threaded=None, submit('echo finished')
verbose=False):
This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.
It is the equivalent of
.. code-block:: bash
fsl_sub -T 1 -l log -j 108023,108019 "echo finished"
For python scripts that submit themselves to the cluster, it might be useful to give the user some control
over at least some of the submission parameters. This can be done using:
.. code-block:: python
import argparse
parser = argparse.ArgumentParser("my script doing awesome stuff")
parser.add_argument("input_file")
parser.add_argument("output_file")
SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir'))
args = parser.parse_args()
submitter = SubmitParams.from_args(args).update(minutes=10)
from fsl import wrappers
wrappers.bet(input_file, output_file, fslsub=submitter)
This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
"""
minutes: Optional[float] = None
queue: Optional[str] = None
architecture: Optional[str] = None
priority: Optional[int] = None
email: Optional[str] = None
wait_for: Union[str, None, Collection[str]] = None
job_name: Optional[str] = None
ram: Optional[int] = None
logdir: Optional[str] = None
mail_options: Optional[str] = None
flags: bool = False
multi_threaded: Optional[Tuple[str, str]] = None
verbose: bool = False
env: dict = None
cmd_line_flags = {
'-T': 'minutes',
'-q': 'queue',
'-a': 'architecture',
'-p': 'priority',
'-M': 'email',
'-N': 'job_name',
'-R': 'ram',
'-l': 'logdir',
'-m': 'mail_options',
}
def __post_init__(self):
"""
If not set explicitly by the user don't alter the environment in which the script will be submitted
"""
if self.env is None:
self.env = dict(os.environ)
def as_flags(self, ):
"""
Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
:return: tuple with the flags
"""
res = []
for key, value in self.cmd_line_flags.items():
if getattr(self, value) is not None:
res.extend((key, str(getattr(self, value))))
if self.verbose:
res.append('-v')
if self.flags:
res.append('-F')
if self.multi_threaded:
res.extend(("-s", ','.join(self.multi_threaded)))
if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
res.extend(('-j', _flatten_job_ids(self.wait_for)))
return tuple(res)
def __str__(self):
return 'SubmitParams({})'.format(" ".join(self.as_flags()))
@deprecated.deprecated('3.7.0', '4.0.0',
'Use fsl_sub or fsl.wrappers.fsl_sub instead')
def __call__(self, *command, **kwargs):
"""
Submits the command to the cluster.
:param command: string or tuple of strings with the command to submit
:param kwargs: Keyword arguments can override any parameters set in self
:return: job ID
"""
from fsl.utils.run import prepareArgs, runfsl
runner = self.update(**kwargs)
command = prepareArgs(command)
fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command))
log.debug(fsl_sub_cmd)
jobid = runfsl(fsl_sub_cmd, env=runner.env).strip()
log.debug('Job submitted as {}'.format(jobid))
return jobid
def update(self, **kwargs):
"""
Creates a new SubmitParams withe updated parameters
"""
values = asdict(self)
values.update(kwargs)
return SubmitParams(**values)
@classmethod
def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands',
include=('wait_for', 'logdir', 'email', 'mail_options')):
"""
Adds submission parameters to the parser
:param parser: parser that should understand submission commands
:param as_group: add as a new group
:param include: sequence of argument flags/names that should be added to the parser
(set to None to include everything)
:return: the group the arguments got added to
"""
from fsl.utils.run import runfsl, FSLNotPresent
try:
fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True)
except (FileNotFoundError, FSLNotPresent):
warnings.warn('fsl_sub was not found')
return
doc_lines = fsl_sub_run.splitlines()
nspaces = 1
for line in doc_lines:
if len(line.strip()) > 0:
while line.startswith(' ' * nspaces):
nspaces += 1
nspaces -= 1
if as_group:
group = parser.add_argument_group(as_group)
else:
group = parser
def get_explanation(flag):
explanation = None
for line in doc_lines:
if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
explanation.append(line[nspaces:].strip())
elif explanation is not None:
break
elif line.strip().startswith(flag):
explanation = [line[nspaces:].strip()]
if (explanation is None) or (len(explanation) == 0):
return 'documentation not found'
return ' '.join(explanation)
for flag, value in cls.cmd_line_flags.items():
if include is not None and value not in include and flag not in include:
continue
as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
action = 'store_true' if value == 'verbose' else 'store'
group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action,
metavar='<' + value + '>', type=as_type.get(value, str))
group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true')
group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true')
group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'),
metavar='<pename>,<threads>')
group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'),
metavar='<jid>')
return group
@classmethod
def from_args(cls, args):
"""
Create a SubmitParams from the command line arguments
"""
as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
if args._sub_wait_for is not None:
as_dict['wait_for'] = args._sub_wait_for.split(',')
if args._sub_multi_threaded is not None:
pename, threads = args._sub_multi_threaded.split(',')
as_dict['multi_threaded'] = pename, threads
return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict)
@deprecated.deprecated('3.7.0', '4.0.0',
'Use fsl_sub or fsl.wrappers.fsl_sub instead')
def submit(*command, **kwargs):
""" """
Submits a given command to the cluster Submits a given command to the cluster
The ``submit`` function is deprecated - you should use
:mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python
library, which is available in FSL 6.0.5 and newer.
You can pass the command and arguments as a single string, or as a regular or unpacked sequence. You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
:arg command: string or regular/unpacked sequence of strings with the job command :arg command: string or regular/unpacked sequence of strings with the job command
...@@ -100,69 +302,78 @@ def submit(*command, ...@@ -100,69 +302,78 @@ def submit(*command,
- <threads>: number of threads to run - <threads>: number of threads to run
:arg verbose: If True, use verbose mode :arg verbose: If True, use verbose mode
:arg env: Dict containing environment variables
:return: string of submitted job id :return: string of submitted job id
""" """
return SubmitParams(**kwargs)(*command)
from fsl.utils.run import runfsl, prepareArgs
base_cmd = ['fsl_sub']
for flag, variable_name in [
('-T', 'minutes'),
('-q', 'queue'),
('-a', 'architecture'),
('-p', 'priority'),
('-M', 'email'),
('-N', 'job_name'),
('-R', 'ram'),
('-l', 'logdir'),
('-m', 'mail_options'),
('-z', 'output')]:
variable = locals()[variable_name]
if variable:
base_cmd.extend([flag, str(variable)])
if flags:
base_cmd.append('-F')
if verbose:
base_cmd.append('-v')
if wait_for:
base_cmd.extend(['-j', _flatten_job_ids(wait_for)])
if multi_threaded:
base_cmd.append('-s')
base_cmd.extend(multi_threaded)
base_cmd.extend(prepareArgs(command))
return runfsl(*base_cmd).strip()
def info(job_id): @deprecated.deprecated('3.7.0', '4.0.0', 'Use fsl_sub.report instead')
def info(job_ids) -> Dict[str, Optional[Dict[str, str]]]:
"""Gets information on a given job id """Gets information on a given job id
Uses `qstat -j <job_id>` The ``info`` function is deprecated - you should use the
``fsl_sub.report`` function from the ``fsl_sub`` Python library, which
is available in FSL 6.0.5 and newer.
:arg job_id: string with job id Uses `qstat -j <job_ids>`
:return: dictionary with information on the submitted job (empty
if job does not exist) :arg job_ids: string with job id or (nested) sequence with jobs
:return: dictionary of jobid -> another dictionary with job information
(or None if job does not exist)
""" """
if not hasattr(info, '_ncall'):
info._ncall = 0
info._ncall += 1
if info._ncall == 3:
warnings.warn("Please do not call `fslsub.info` repeatably, because it slows down the cluster. You can avoid this message by simply passing all the job IDs you are interested in to a single `fslsub.info` call.")
from fsl.utils.run import run
job_ids_string = _flatten_job_ids(job_ids)
try: try:
result = sp.call(['qstat', '-j', job_id]).decode('utf-8') result = run(['qstat', '-j', job_ids_string], exitcode=True)[0]
except FileNotFoundError: except FileNotFoundError:
log.debug("qstat not found; assuming not on cluster") log.debug("qstat not found; assuming not on cluster")
return {} return {}
if 'Following jobs do not exist:' in result: return _parse_qstat(job_ids_string, result)
return {}
res = {}
for line in result.splitlines()[1:]: def _parse_qstat(job_ids_string, qstat_stdout):
key, value = line.split(':', nsplit=1) """
res[key.strip()] = value.strip() Parses the qstat output into a dictionary of dictionaries
:param job_ids_string: input job ids
:param qstat_stdout: qstat output
:return: dictionary of jobid -> another dictionary with job information
(or None if job does not exist)
"""
res = {job_id: None for job_id in job_ids_string.split(',')}
current_job_id = None
for line in qstat_stdout.splitlines()[1:]:
line = line.strip()
if len(line) == 0:
continue
if line == '=' * len(line):
current_job_id = None
elif ':' in line:
current_key, value = [part.strip() for part in line.split(':', 1)]
if current_key == 'job_number':
current_job_id = value
if current_job_id not in job_ids_string:
raise ValueError(f"Unexpected job ID in qstat output:\n{line}")
res[current_job_id] = {}
else:
if current_job_id is None:
raise ValueError(f"Found job information before job ID in qstat output:\n{line}")
res[current_job_id][current_key] = value
else:
res[current_job_id][current_key] += '\n' + line
return res return res
@deprecated.deprecated('3.13.0', '4.0.0',
'Use fsl.utils.run.job_output instead')
def output(job_id, logdir='.', command=None, name=None): def output(job_id, logdir='.', command=None, name=None):
"""Returns the output of the given job. """Returns the output of the given job.
...@@ -173,46 +384,7 @@ def output(job_id, logdir='.', command=None, name=None): ...@@ -173,46 +384,7 @@ def output(job_id, logdir='.', command=None, name=None):
:arg name: Job name if it was specified. Not currently used. :arg name: Job name if it was specified. Not currently used.
:returns: A tuple containing the standard output and standard error. :returns: A tuple containing the standard output and standard error.
""" """
return run.job_output(job_id, logdir, command, name)
stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
if len(stdout) != 1 or len(stderr) != 1:
raise ValueError('No/too many error/output files for job {}: stdout: '
'{}, stderr: {}'.format(job_id, stdout, stderr))
stdout = stdout[0]
stderr = stderr[0]
if op.exists(stdout):
with open(stdout, 'rt') as f:
stdout = f.read()
else:
stdout = None
if op.exists(stderr):
with open(stderr, 'rt') as f:
stderr = f.read()
else:
stderr = None
return stdout, stderr
def wait(job_ids):
"""Wait for one or more jobs to finish
:arg job_ids: string or tuple of strings with jobs that should finish
before continuing
"""
start_time = time.time()
for job_id in _flatten_job_ids(job_ids):
log.debug('Waiting for job {}'.format(job_id))
while len(info(job_id)) > 0:
wait_time = min(max(1, (time.time() - start_time) / 3.), 20)
time.sleep(wait_time)
log.debug('Job {} finished, continuing to next'.format(job_id))
log.debug('All jobs have finished')
def _flatten_job_ids(job_ids): def _flatten_job_ids(job_ids):
...@@ -237,36 +409,29 @@ def _flatten_job_ids(job_ids): ...@@ -237,36 +409,29 @@ def _flatten_job_ids(job_ids):
return ','.join(sorted(unpack(job_ids))) return ','.join(sorted(unpack(job_ids)))
_external_job = """#!{} @deprecated.deprecated('3.13.0', '4.0.0', 'Use fsl.utils.run.hold instead')
# This is a temporary file designed to run the python function {}, def hold(job_ids, hold_filename=None):
# so that it can be submitted to the cluster """
Waits until all jobs have finished
import pickle
from six import BytesIO Internally works by submitting a new job, which creates a file named `hold_filename`,
from importlib import import_module which will only run after all jobs in `job_ids` finished.
pickle_bytes = BytesIO({})
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)
if name_type == 'module':
# retrieves a function defined in an external module
func = getattr(import_module(name), func_name)
elif name_type == 'script':
# retrieves a function defined in the __main__ script
local_execute = {{'__name__': '__not_main__', '__file__': name}}
exec(open(name, 'r').read(), local_execute)
func = local_execute[func_name]
else:
raise ValueError('Unknown name_type: %r' % name_type)
res = func(*args, **kwargs)
if res is not None:
with open(__file__ + '_out.pickle') as f:
pickle.dump(f, res)
"""
This function will only return once `hold_filename` has been created
def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False): :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
:param hold_filename: filename to use as a hold file.
The containing directory should exist, but the file itself should not.
Defaults to a ./.<random characters>.hold in the current directory.
:return: only returns when all the jobs have finished
"""
run.hold(job_ids, hold_filename)
@deprecated.deprecated('3.13.0', '4.0.0',
'Use fsl.utils.run.func_to_cmd or '
'fsl.utils.run.runfunc instead')
def func_to_cmd(func, args=None, kwargs=None, tmp_dir=None, clean="never", verbose=False):
"""Defines the command needed to run the function from the command line """Defines the command needed to run the function from the command line
WARNING: if submitting a function defined in the __main__ script, WARNING: if submitting a function defined in the __main__ script,
...@@ -277,25 +442,13 @@ def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False): ...@@ -277,25 +442,13 @@ def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
:arg args: positional arguments :arg args: positional arguments
:arg kwargs: keyword arguments :arg kwargs: keyword arguments
:arg tmp_dir: directory where to store the temporary file :arg tmp_dir: directory where to store the temporary file
:arg clean: if True removes the submitted script after running it :arg clean: Whether the script should be removed after running. There are three options:
- "never" (default): Script is kept
- "on_success": only remove if script successfully finished (i.e., no error is raised)
- "always": always remove the script, even if it raises an error
:arg verbose: If set to True, the script will print its own filename before running
:return: string which will run the function :return: string which will run the function
""" """
pickle_bytes = BytesIO() return run.func_to_cmd(func, args, kwargs, tmp_dir, clean, verbose)
if func.__module__ == '__main__':
pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
args, kwargs), pickle_bytes)
else:
pickle.dump(('module', func.__module__, func.__name__,
args, kwargs), pickle_bytes)
python_cmd = _external_job.format(sys.executable,
func.__name__,
pickle_bytes.getvalue())
_, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
suffix='.py',
dir=tmp_dir)
with open(filename, 'w') as python_file:
python_file.write(python_cmd)
return sys.executable + " " + filename + ('; rm ' + filename if clean else '')
...@@ -85,13 +85,39 @@ from collections import abc ...@@ -85,13 +85,39 @@ from collections import abc
try: import queue try: import queue
except ImportError: import Queue as queue except ImportError: import Queue as queue
from fsl.utils.deprecated import deprecated
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class IdleTask(object): @functools.lru_cache()
def _canHaveGui():
"""Return ``True`` if wxPython is installed, and a display is available,
``False`` otherwise.
"""
# Determine if a display is available. We do
# this once at init (instead of on-demand in
# the canHaveGui method) because calling the
# IsDisplayAvailable function will cause the
# application to steal focus under OSX!
try:
import wx
return wx.App.IsDisplayAvailable()
except ImportError:
return False
def _haveGui():
"""Return ``True`` if wxPython is installed, a display is available, and
a ``wx.App`` exists, ``False`` otherwise.
"""
try:
import wx
return _canHaveGui() and (wx.GetApp() is not None)
except ImportError:
return False
class IdleTask:
"""Container object used by the :class:`IdleLoop` class. """Container object used by the :class:`IdleLoop` class.
Used to encapsulate information about a queued task. Used to encapsulate information about a queued task.
""" """
...@@ -113,7 +139,7 @@ class IdleTask(object): ...@@ -113,7 +139,7 @@ class IdleTask(object):
self.kwargs = kwargs self.kwargs = kwargs
class IdleLoop(object): class IdleLoop:
"""This class contains logic for running tasks via ``wx.EVT_IDLE`` events. """This class contains logic for running tasks via ``wx.EVT_IDLE`` events.
A single ``IdleLoop`` instance is created when this module is first A single ``IdleLoop`` instance is created when this module is first
...@@ -372,8 +398,6 @@ class IdleLoop(object): ...@@ -372,8 +398,6 @@ class IdleLoop(object):
``timeout``, or ``alwaysQueue``. ``timeout``, or ``alwaysQueue``.
""" """
from fsl.utils.platform import platform as fslplatform
schedtime = time.time() schedtime = time.time()
timeout = kwargs.pop('timeout', 0) timeout = kwargs.pop('timeout', 0)
after = kwargs.pop('after', 0) after = kwargs.pop('after', 0)
...@@ -382,18 +406,15 @@ class IdleLoop(object): ...@@ -382,18 +406,15 @@ class IdleLoop(object):
skipIfQueued = kwargs.pop('skipIfQueued', False) skipIfQueued = kwargs.pop('skipIfQueued', False)
alwaysQueue = kwargs.pop('alwaysQueue', False) alwaysQueue = kwargs.pop('alwaysQueue', False)
canHaveGui = fslplatform.canHaveGui
haveGui = fslplatform.haveGui
# If there is no possibility of a # If there is no possibility of a
# gui being available in the future # gui being available in the future
# (determined by canHaveGui), then # (determined by _canHaveGui), then
# alwaysQueue is ignored. # alwaysQueue is ignored.
alwaysQueue = alwaysQueue and canHaveGui alwaysQueue = alwaysQueue and _canHaveGui()
# We don't have wx - run the task # We don't have wx - run the task
# directly/synchronously. # directly/synchronously.
if self.__neverQueue or not (haveGui or alwaysQueue): if self.__neverQueue or not (_haveGui() or alwaysQueue):
time.sleep(after) time.sleep(after)
log.debug('Running idle task directly') log.debug('Running idle task directly')
task(*args, **kwargs) task(*args, **kwargs)
...@@ -597,36 +618,6 @@ def idleWhen(*args, **kwargs): ...@@ -597,36 +618,6 @@ def idleWhen(*args, **kwargs):
idleLoop.idleWhen(*args, **kwargs) idleLoop.idleWhen(*args, **kwargs)
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.inIdle instead')
def inIdle(taskName):
"""Deprecated - use ``idleLoop.inIdle`` instead. """
return idleLoop.inIdle(taskName)
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.cancelIdle instead')
def cancelIdle(taskName):
"""Deprecated - use ``idleLoop.cancelIdle`` instead. """
return idleLoop.cancelIdle(taskName)
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.reset instead')
def idleReset():
"""Deprecated - use ``idleLoop.reset`` instead. """
return idleLoop.reset()
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.callRate instead')
def getIdleTimeout():
"""Deprecated - use ``idleLoop.callRate`` instead. """
return idleLoop.callRate
@deprecated('2.7.0', '3.0.0', 'Use idleLoop.callRate instead')
def setIdleTimeout(timeout=None):
"""Deprecated - use ``idleLoop.callRate`` instead. """
idleLoop.callRate = timeout
def block(secs, delta=0.01, until=None): def block(secs, delta=0.01, until=None):
"""Blocks for the specified number of seconds, yielding to the main ``wx`` """Blocks for the specified number of seconds, yielding to the main ``wx``
loop. loop.
...@@ -643,11 +634,13 @@ def block(secs, delta=0.01, until=None): ...@@ -643,11 +634,13 @@ def block(secs, delta=0.01, until=None):
determins when calls to ``block`` will return. determins when calls to ``block`` will return.
""" """
havewx = _haveGui()
def defaultUntil(): def defaultUntil():
return False return False
def tick(): def tick():
if fslplatform.haveGui: if havewx:
import wx import wx
wx.YieldIfNeeded() wx.YieldIfNeeded()
time.sleep(delta) time.sleep(delta)
...@@ -655,8 +648,6 @@ def block(secs, delta=0.01, until=None): ...@@ -655,8 +648,6 @@ def block(secs, delta=0.01, until=None):
if until is None: if until is None:
until = defaultUntil until = defaultUntil
from fsl.utils.platform import platform as fslplatform
start = time.time() start = time.time()
while (time.time() - start) < secs: while (time.time() - start) < secs:
tick() tick()
...@@ -685,12 +676,11 @@ def run(task, onFinish=None, onError=None, name=None): ...@@ -685,12 +676,11 @@ def run(task, onFinish=None, onError=None, name=None):
the return value will be ``None``. the return value will be ``None``.
""" """
from fsl.utils.platform import platform as fslplatform
if name is None: if name is None:
name = getattr(task, '__name__', '<unknown>') name = getattr(task, '__name__', '<unknown>')
haveWX = fslplatform.haveGui haveWX = _haveGui()
# Calls the onFinish or onError handler # Calls the onFinish or onError handler
def callback(cb, *args, **kwargs): def callback(cb, *args, **kwargs):
...@@ -759,14 +749,12 @@ def wait(threads, task, *args, **kwargs): ...@@ -759,14 +749,12 @@ def wait(threads, task, *args, **kwargs):
a keyword argument called ``wait_direct``. a keyword argument called ``wait_direct``.
""" """
from fsl.utils.platform import platform as fslplatform
direct = kwargs.pop('wait_direct', False) direct = kwargs.pop('wait_direct', False)
if not isinstance(threads, abc.Sequence): if not isinstance(threads, abc.Sequence):
threads = [threads] threads = [threads]
haveWX = fslplatform.haveGui haveWX = _haveGui()
def joinAll(): def joinAll():
log.debug('Wait thread joining on all targets') log.debug('Wait thread joining on all targets')
...@@ -787,14 +775,15 @@ def wait(threads, task, *args, **kwargs): ...@@ -787,14 +775,15 @@ def wait(threads, task, *args, **kwargs):
return None return None
class Task(object): class Task:
"""Container object which encapsulates a task that is run by a """Container object which encapsulates a task that is run by a
:class:`TaskThread`. :class:`TaskThread`.
""" """
def __init__(self, name, func, onFinish, args, kwargs): def __init__(self, name, func, onFinish, onError, args, kwargs):
self.name = name self.name = name
self.func = func self.func = func
self.onFinish = onFinish self.onFinish = onFinish
self.onError = onError
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
self.enabled = True self.enabled = True
...@@ -806,7 +795,6 @@ class TaskThreadVeto(Exception): ...@@ -806,7 +795,6 @@ class TaskThreadVeto(Exception):
handler (if one has been specified). See the :meth:`TaskThread.enqueue` handler (if one has been specified). See the :meth:`TaskThread.enqueue`
method for more details. method for more details.
""" """
pass
class TaskThread(threading.Thread): class TaskThread(threading.Thread):
...@@ -840,9 +828,16 @@ class TaskThread(threading.Thread): ...@@ -840,9 +828,16 @@ class TaskThread(threading.Thread):
:arg onFinish: An optional function to be called (via :func:`idle`) :arg onFinish: An optional function to be called (via :func:`idle`)
when the task funtion has finished. Must be provided as when the task funtion has finished. Must be provided as
a keyword argument. If the ``func`` raises a a keyword argument, and must itself accept no arguments.
:class`TaskThreadVeto` error, this function will not If the ``func`` raises a :class`TaskThreadVeto` error,
be called. this function will not be called.
:arg onError: An optional function to be called (via :func:`idle`)
if the task funtion raises an ``Exception``. Must be
provided as a keyword argument, and must itself accept
the raised ``Exception`` object as a single argument.
If the ``func`` raises a :class`TaskThreadVeto` error,
this function will not be called.
All other arguments are passed through to the task function when it is All other arguments are passed through to the task function when it is
executed. executed.
...@@ -853,16 +848,18 @@ class TaskThread(threading.Thread): ...@@ -853,16 +848,18 @@ class TaskThread(threading.Thread):
results. results.
.. warning:: Make sure that your task function is not expecting keyword .. warning:: Make sure that your task function is not expecting keyword
arguments called ``taskName`` or ``onFinish``! arguments called ``taskName``, ``onFinish``, or
``onError``!
""" """
name = kwargs.pop('taskName', None) name = kwargs.pop('taskName', None)
onFinish = kwargs.pop('onFinish', None) onFinish = kwargs.pop('onFinish', None)
onError = kwargs.pop('onError', None)
log.debug('Enqueueing task: {} [{}]'.format( log.debug('Enqueueing task: {} [{}]'.format(
name, getattr(func, '__name__', '<unknown>'))) name, getattr(func, '__name__', '<unknown>')))
t = Task(name, func, onFinish, args, kwargs) t = Task(name, func, onFinish, onError, args, kwargs)
self.__enqueued[name] = t self.__enqueued[name] = t
self.__q.put(t) self.__q.put(t)
...@@ -983,6 +980,9 @@ class TaskThread(threading.Thread): ...@@ -983,6 +980,9 @@ class TaskThread(threading.Thread):
type(e).__name__, type(e).__name__,
str(e)), str(e)),
exc_info=True) exc_info=True)
if task.onError is not None:
idle(task.onError, e)
finally: finally:
self.__q.task_done() self.__q.task_done()
...@@ -1024,7 +1024,7 @@ def mutex(*args, **kwargs): ...@@ -1024,7 +1024,7 @@ def mutex(*args, **kwargs):
return MutexFactory(*args, **kwargs) return MutexFactory(*args, **kwargs)
class MutexFactory(object): class MutexFactory:
"""The ``MutexFactory`` is a placeholder for methods which have been """The ``MutexFactory`` is a placeholder for methods which have been
decorated with the :func:`mutex` decorator. When the method of a class decorated with the :func:`mutex` decorator. When the method of a class
is decorated with ``@mutex``, a ``MutexFactory`` is created. is decorated with ``@mutex``, a ``MutexFactory`` is created.
......
...@@ -10,8 +10,7 @@ manipulating and working with :class:`.Image` objects. ...@@ -10,8 +10,7 @@ manipulating and working with :class:`.Image` objects.
The following modules are available: The following modules are available:
.. autosummary:: .. autosummary::
:nosignature
.image.resample fsl.utils.image.resample
.image.roi fsl.utils.image.roi
""" """
...@@ -10,13 +10,10 @@ to resample an :class:`.Image` object to a different resolution. ...@@ -10,13 +10,10 @@ to resample an :class:`.Image` object to a different resolution.
The :func:`resampleToPixdims` and :func:`resampleToReference` functions The :func:`resampleToPixdims` and :func:`resampleToReference` functions
are convenience wrappers around :func:`resample`. are convenience wrappers around :func:`resample`.
The :func:`applySmoothing` and :func:`calculateMatrix` functions are The :func:`applySmoothing` function is a sub-function of :func:`resample`.
sub-functions of :func:`resample`.
""" """
import collections.abc as abc
import numpy as np import numpy as np
import scipy.ndimage as ndimage import scipy.ndimage as ndimage
...@@ -49,9 +46,9 @@ def resampleToReference(image, reference, matrix=None, **kwargs): ...@@ -49,9 +46,9 @@ def resampleToReference(image, reference, matrix=None, **kwargs):
along the spatial (first three) dimensions. along the spatial (first three) dimensions.
:arg image: :class:`.Image` to resample :arg image: :class:`.Image` to resample
:arg matrix: Optional world-to-world affine alignment matrix
:arg reference: :class:`.Nifti` defining the space to resample ``image`` :arg reference: :class:`.Nifti` defining the space to resample ``image``
into into
:arg matrix: Optional world-to-world affine alignment matrix
""" """
oldShape = list(image.shape) oldShape = list(image.shape)
...@@ -193,7 +190,7 @@ def resample(image, ...@@ -193,7 +190,7 @@ def resample(image,
if origin not in ('centre', 'corner'): if origin not in ('centre', 'corner'):
raise ValueError('Invalid value for origin: {}'.format(origin)) raise ValueError('Invalid value for origin: {}'.format(origin))
data = np.array(image[sliceobj], dtype=dtype, copy=False) data = np.asarray(image[sliceobj], dtype=dtype)
if len(data.shape) != len(newShape): if len(data.shape) != len(newShape):
raise ValueError('Data dimensions do not match new shape: ' raise ValueError('Data dimensions do not match new shape: '
...@@ -204,15 +201,15 @@ def resample(image, ...@@ -204,15 +201,15 @@ def resample(image,
# old/new shape ratio and the origin # old/new shape ratio and the origin
# setting. # setting.
if matrix is None: if matrix is None:
matrix = calculateMatrix(data.shape, newShape, origin) matrix = affine.rescale(data.shape, newShape, origin)
# calculateMatrix will return None # same shape and identity matrix? the
# if it decides that the image # image doesn't need to be resampled
# doesn't need to be resampled if np.all(np.isclose(data.shape, newShape)) and \
if matrix is None: np.all(np.isclose(matrix, np.eye(len(newShape) + 1))):
return data, image.voxToWorldMat return data, image.voxToWorldMat
newShape = np.array(np.round(newShape), dtype=np.int) newShape = np.array(np.round(newShape), dtype=int)
# Apply smoothing if requested, # Apply smoothing if requested,
# and if not using nn interp # and if not using nn interp
...@@ -230,9 +227,9 @@ def resample(image, ...@@ -230,9 +227,9 @@ def resample(image,
# Construct an affine transform which # Construct an affine transform which
# puts the resampled image into the # puts the resampled image into the
# same world coordinate system as this # same world coordinate system as this
# image. The calculateMatrix function # image. We may be working with >3D data,
# might not return a 4x4 matrix, so we # so here we discard the non-spatial
# make sure it is valid. # parts of the resampling matrix
if matrix.shape != (4, 4): if matrix.shape != (4, 4):
rotmat = matrix[:3, :3] rotmat = matrix[:3, :3]
offsets = matrix[:3, -1] offsets = matrix[:3, -1]
...@@ -284,55 +281,3 @@ def applySmoothing(data, matrix, newShape): ...@@ -284,55 +281,3 @@ def applySmoothing(data, matrix, newShape):
sigma[ratio >= 1.1] *= 0.425 sigma[ratio >= 1.1] *= 0.425
return ndimage.gaussian_filter(data, sigma) return ndimage.gaussian_filter(data, sigma)
def calculateMatrix(oldShape, newShape, origin):
"""Calculates an affine matrix to use for resampling.
Called by :func:`resample`. The matrix will contain scaling factors
determined from the ``oldShape / newShape`` ratio, and an offset
determined from the ``origin``.
:arg oldShape: Shape of input data
:arg newShape: Shape to resample data to
:arg origin: Voxel grid alignment - either ``'centre'`` or ``'corner'``
:returns: An affine matrix that can be passed to
``scipy.ndimage.affine_transform``.
"""
oldShape = np.array(oldShape, dtype=np.float)
newShape = np.array(newShape, dtype=np.float)
if np.all(np.isclose(oldShape, newShape)):
return None
# Otherwise we calculate a
# scaling matrix from the
# old/new shape ratio, and
# specify an offset
# according to the origin
else:
ratio = oldShape / newShape
scale = np.diag(ratio)
# Calculate an offset from the
# origin - the default behaviour
# (centre) causes the corner voxel
# of the output to have the same
# centre as the corner voxel of
# the input. If the origin is
# 'corner', we apply an offset
# which effectively causes the
# voxel grids of the input and
# output to be aligned.
if origin == 'centre': offset = 0
elif origin == 'corner': offset = list((ratio - 1) / 2)
if not isinstance(offset, abc.Sequence):
offset = [offset] * len(newShape)
# ndimage.affine_transform will accept
# a matrix of shape (ndim, ndim + 1)
matrix = np.hstack((scale, np.atleast_2d(offset).T))
return matrix
...@@ -18,6 +18,7 @@ import os ...@@ -18,6 +18,7 @@ import os
import os.path as op import os.path as op
import shutil import shutil
import numpy as np
import nibabel as nib import nibabel as nib
import fsl.utils.path as fslpath import fsl.utils.path as fslpath
...@@ -44,12 +45,13 @@ def imcp(src, ...@@ -44,12 +45,13 @@ def imcp(src,
already exist. Defaults to ``False``. already exist. Defaults to ``False``.
:arg useDefaultExt: Defaults to ``False``. If ``True``, the destination :arg useDefaultExt: Defaults to ``False``. If ``True``, the destination
file type will be set according to the default file type will be set according to the default file
extension, specified by type, specified by
:func:`~fsl.data.image.defaultExt`. If the source :func:`~fsl.data.image.defaultOutputType`. If the
file does not have the same type as the default source file does not have the same type as the default
extension, it will be converted. If ``False``, the extension, it will be converted. If ``False``, the
source file type is not changed. source file type is used, and the destination file type
(if specified) is ignored.
:arg move: If ``True``, the files are moved, instead of being :arg move: If ``True``, the files are moved, instead of being
copied. See :func:`immv`. copied. See :func:`immv`.
...@@ -57,7 +59,7 @@ def imcp(src, ...@@ -57,7 +59,7 @@ def imcp(src,
# special case - non-existent directory # special case - non-existent directory
if dest.endswith('/') and not op.isdir(dest): if dest.endswith('/') and not op.isdir(dest):
raise fslpath.PathError('Directory does not exist: {}'.format(dest)) raise fslpath.PathError(f'Directory does not exist: {dest}')
if op.isdir(dest): if op.isdir(dest):
dest = op.join(dest, op.basename(src)) dest = op.join(dest, op.basename(src))
...@@ -87,17 +89,22 @@ def imcp(src, ...@@ -87,17 +89,22 @@ def imcp(src,
if not op.exists(src): if not op.exists(src):
raise fslpath.PathError('imcp error - source path ' raise fslpath.PathError('imcp error - source path '
'does not exist: {}'.format(src)) f'does not exist: {src}')
# Figure out the destination file # Infer the image type of the source image. We
# extension/type. If useDefaultExt # can't just look at the extension, as e.g. an
# is True, we use the default # .img file can be any of ANALYZE/NIFTI1/NIFTI2
# extension. Otherwise, if no srcType = fslimage.fileType(src)
# destination file extension is
# provided, we use the source # Figure out the destination file extension/type.
# extension. # If useDefaultExt is True, we use the default
if useDefaultExt: destExt = fslimage.defaultExt() # extension. Otherwise we use the source type
elif destExt == '': destExt = srcExt if useDefaultExt:
destType = fslimage.defaultOutputType()
destExt = fslimage.defaultExt()
else:
destType = srcType
destExt = srcExt
# Resolve any file group differences # Resolve any file group differences
# e.g. we don't care if the src is # e.g. we don't care if the src is
...@@ -116,10 +123,10 @@ def imcp(src, ...@@ -116,10 +123,10 @@ def imcp(src,
# Give up if we don't have permission. # Give up if we don't have permission.
if not os.access(op.dirname(dest), os.W_OK | os.X_OK): if not os.access(op.dirname(dest), os.W_OK | os.X_OK):
raise fslpath.PathError('imcp error - cannot write to {}'.format(dest)) raise fslpath.PathError(f'imcp error - cannot write to {dest}')
if move and not os.access(op.dirname(src), os.W_OK | os.X_OK): if move and not os.access(op.dirname(src), os.W_OK | os.X_OK):
raise fslpath.PathError('imcp error - cannot move from {}'.format(src)) raise fslpath.PathError(f'imcp error - cannot move from {src}')
# If the source file type does not # If the source file type does not
# match the destination file type, # match the destination file type,
...@@ -129,14 +136,48 @@ def imcp(src, ...@@ -129,14 +136,48 @@ def imcp(src,
# io and cpu, but programmatically # io and cpu, but programmatically
# very easy - nibabel does all the # very easy - nibabel does all the
# hard work. # hard work.
if srcExt != destExt: if srcType != destType:
if not overwrite and op.exists(dest): if not overwrite and op.exists(dest):
raise fslpath.PathError('imcp error - destination already ' raise fslpath.PathError('imcp error - destination '
'exists ({})'.format(dest)) f'already exists ({dest})')
img = nib.load(src) img = nib.load(src)
# Force conversion to specific file format if
# necessary. The file format (pair, gzipped
# or not) is taken care of automatically by
# nibabel
if 'ANALYZE' in destType.name: cls = nib.AnalyzeImage
elif 'NIFTI2' in destType.name: cls = nib.Nifti2Image
elif 'NIFTI' in destType.name: cls = nib.Nifti1Image
# The default behaviour of nibabel when saving
# is to rescale the image data to the full range
# of the data type, and then set the scl_slope/
# inter header fields accordingly. This is highly
# disruptive in many circumstances. Fortunately:
# - The nibabel ArrayProxy class provides a
# get_unscaled method, which allows us to
# bypass the rescaling at load time.
# - Explicitly setting the slope and intercept
# on the header allows us to bypass rescaling
# at save time.
#
# https://github.com/nipy/nibabel/issues/1001
# https://neurostars.org/t/preserve-datatype-and-precision-with-nibabel/27641/2
slope = img.dataobj.slope
inter = img.dataobj.inter
data = np.asanyarray(img.dataobj.get_unscaled(),
dtype=img.get_data_dtype())
img = cls(data, None, header=img.header)
img.header.set_slope_inter(slope, inter)
nib.save(img, dest) nib.save(img, dest)
# Make sure the image reference is cleared, and
# hopefully GC'd, as otherwise we sometimes get
# errors on Windows (mostly in unit tests) w.r.t.
# attempts to delete files which are still open
img = None img = None
if move: if move:
...@@ -193,7 +234,7 @@ def imcp(src, ...@@ -193,7 +234,7 @@ def imcp(src,
# paths already exist # paths already exist
if not overwrite and any([op.exists(d) for d in copyDests]): if not overwrite and any([op.exists(d) for d in copyDests]):
raise fslpath.PathError('imcp error - a destination path already ' raise fslpath.PathError('imcp error - a destination path already '
'exists ({})'.format(', '.join(copyDests))) f'exists ({",".join(copyDests)})')
# Do the copy/move # Do the copy/move
for src, dest in zip(copySrcs, copyDests): for src, dest in zip(copySrcs, copyDests):
......
...@@ -21,7 +21,6 @@ a function: ...@@ -21,7 +21,6 @@ a function:
import logging import logging
import hashlib import hashlib
import functools import functools
import six
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -171,7 +170,7 @@ def memoizeMD5(func): ...@@ -171,7 +170,7 @@ def memoizeMD5(func):
# compatible) bytes , and take # compatible) bytes , and take
# the hash of those bytes. # the hash of those bytes.
for arg in args: for arg in args:
if not isinstance(arg, six.string_types): if not isinstance(arg, str):
arg = str(arg) arg = str(arg)
arg = arg.encode('utf-8') arg = arg.encode('utf-8')
hashobj.update(arg) hashobj.update(arg)
...@@ -243,8 +242,8 @@ def skipUnchanged(func): ...@@ -243,8 +242,8 @@ def skipUnchanged(func):
isarray = oldIsArray or newIsArray isarray = oldIsArray or newIsArray
if isarray: if isarray:
a = np.array(oldVal, copy=False) a = np.asarray(oldVal)
b = np.array(value, copy=False) b = np.asarray(value)
nochange = (a.shape == b.shape) and np.allclose(a, b) nochange = (a.shape == b.shape) and np.allclose(a, b)
else: else:
......
...@@ -7,12 +7,9 @@ ...@@ -7,12 +7,9 @@
"""This module provides the :class:`Meta` class. """ """This module provides the :class:`Meta` class. """
import collections class Meta:
"""The ``Meta`` class is intended to be used as a mixin for other classes.
It is simply a wrapper for a dictionary of key-value pairs.
class Meta(object):
"""The ``Meta`` class is intended to be used as a mixin for other classes. It
is simply a wrapper for a dictionary of key-value pairs.
It has a handful of methods allowing you to add and access additional It has a handful of methods allowing you to add and access additional
metadata associated with an object. metadata associated with an object.
...@@ -20,6 +17,7 @@ class Meta(object): ...@@ -20,6 +17,7 @@ class Meta(object):
.. autosummary:: .. autosummary::
:nosignatures: :nosignatures:
meta
metaKeys metaKeys
metaValues metaValues
metaItems metaItems
...@@ -32,11 +30,17 @@ class Meta(object): ...@@ -32,11 +30,17 @@ class Meta(object):
"""Initialises a ``Meta`` instance. """ """Initialises a ``Meta`` instance. """
new = super(Meta, cls).__new__(cls) new = super(Meta, cls).__new__(cls)
new.__meta = collections.OrderedDict() new.__meta = {}
return new return new
@property
def meta(self):
"""Return a reference to the metadata dictionary. """
return self.__meta
def metaKeys(self): def metaKeys(self):
"""Returns the keys contained in the metadata dictionary """Returns the keys contained in the metadata dictionary
(``dict.keys``). (``dict.keys``).
......
...@@ -14,9 +14,6 @@ import inspect ...@@ -14,9 +14,6 @@ import inspect
import contextlib import contextlib
import collections import collections
import six
import fsl.utils.idle as idle import fsl.utils.idle as idle
import fsl.utils.weakfuncref as weakfuncref import fsl.utils.weakfuncref as weakfuncref
...@@ -36,7 +33,7 @@ class Registered(Exception): ...@@ -36,7 +33,7 @@ class Registered(Exception):
pass pass
class _Listener(object): class _Listener:
"""This class is used internally by the :class:`.Notifier` class to """This class is used internally by the :class:`.Notifier` class to
store references to callback functions. store references to callback functions.
""" """
...@@ -61,22 +58,48 @@ class _Listener(object): ...@@ -61,22 +58,48 @@ class _Listener(object):
return self.__callback.function() return self.__callback.function()
@property
def expectsArguments(self):
"""Returns ``True`` if the listener function needs to be passed
arguments, ``False`` otherwise. Listener functions can
be defined to accept either zero arguments, or a set of
positional arguments - see :meth:`Notifier.register` for details.
"""
func = self.callback
# the function may have been GC'd
if func is None:
return False
spec = inspect.signature(func)
posargs = 0
varargs = False
for param in spec.parameters.values():
if param.kind in (inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD):
posargs += 1
elif param.kind == inspect.Parameter.VAR_POSITIONAL:
varargs = True
return varargs or ((not varargs) and (posargs == 3))
def __str__(self): def __str__(self):
cb = self.callback cb = self.callback
if cb is not None: cbName = getattr(cb, '__name__', '<callable>') if cb is not None: name = getattr(cb, '__name__', '<callable>')
else: cbName = '<deleted>' else: name = '<deleted>'
return 'Listener {} [topic: {}] [function: {}]'.format( return f'Listener {self.name} [topic: {self.topic}] [function: {name}]'
self.name, self.topic, cbName)
def __repr__(self): def __repr__(self):
return self.__str__() return self.__str__()
class Notifier(object): class Notifier:
"""The ``Notifier`` class is a mixin which provides simple notification """The ``Notifier`` class is a mixin which provides simple notification
capability. Listeners can be registered/deregistered to listen via the capability. Listeners can be registered/deregistered to listen via the
:meth:`register` and :meth:`deregister` methods, and notified via the :meth:`register` and :meth:`deregister` methods, and notified via the
...@@ -119,8 +142,8 @@ class Notifier(object): ...@@ -119,8 +142,8 @@ class Notifier(object):
:arg name: A unique name for the listener. :arg name: A unique name for the listener.
:arg callback: The function to call - must accept two positional :arg callback: The function to call - must accept either zero
arguments: arguments, or three positional arguments:
- this ``Notifier`` instance. - this ``Notifier`` instance.
...@@ -147,12 +170,12 @@ class Notifier(object): ...@@ -147,12 +170,12 @@ class Notifier(object):
listener = _Listener(name, callback, topic, runOnIdle) listener = _Listener(name, callback, topic, runOnIdle)
if name in self.__listeners[topic]: if name in self.__listeners[topic]:
raise Registered('Listener {} is already registered'.format(name)) raise Registered(f'Listener {name} is already registered')
self.__listeners[topic][name] = listener self.__listeners[topic][name] = listener
self.__enabled[ topic] = self.__enabled.get(topic, True) self.__enabled[ topic] = self.__enabled.get(topic, True)
log.debug('{}: Registered {}'.format(type(self).__name__, listener)) log.debug('%s: Registered %s', type(self).__name__, listener)
def deregister(self, name, topic=None): def deregister(self, name, topic=None):
...@@ -186,8 +209,8 @@ class Notifier(object): ...@@ -186,8 +209,8 @@ class Notifier(object):
self.__listeners.pop(topic) self.__listeners.pop(topic)
self.__enabled .pop(topic) self.__enabled .pop(topic)
log.debug('{}: De-registered listener {}'.format( log.debug('%s: De-registered listener %s',
type(self).__name__, listener)) type(self).__name__, listener)
def enable(self, name, topic=None, enable=True): def enable(self, name, topic=None, enable=True):
...@@ -297,7 +320,7 @@ class Notifier(object): ...@@ -297,7 +320,7 @@ class Notifier(object):
:arg topic: Topic or topics that the listener is registered on. :arg topic: Topic or topics that the listener is registered on.
""" """
if topic is None or isinstance(topic, six.string_types): if topic is None or isinstance(topic, str):
topic = [topic] topic = [topic]
topics = topic topics = topic
...@@ -347,12 +370,12 @@ class Notifier(object): ...@@ -347,12 +370,12 @@ class Notifier(object):
srcMod = '...{}'.format(frame[1][-20:]) srcMod = '...{}'.format(frame[1][-20:])
srcLine = frame[2] srcLine = frame[2]
log.debug('{}: Notifying {} listeners (topic: {}) [{}:{}]'.format( log.debug('%s: Notifying %s listeners (topic: %s) [%s:%s]',
type(self).__name__, type(self).__name__,
len(listeners), len(listeners),
topic, topic,
srcMod, srcMod,
srcLine)) srcLine)
for listener in listeners: for listener in listeners:
...@@ -363,15 +386,19 @@ class Notifier(object): ...@@ -363,15 +386,19 @@ class Notifier(object):
# callback function may have been # callback function may have been
# gc'd - remove it if this is the case. # gc'd - remove it if this is the case.
if callback is None: if callback is None:
log.debug('Listener {} has been gc\'d - ' log.debug('Listener %s has been gc\'d - '
'removing from list'.format(name)) 'removing from list', name)
self.__listeners[listener.topic].pop(name) self.__listeners[listener.topic].pop(name)
continue
elif not listener.enabled: if not listener.enabled:
continue continue
elif listener.runOnIdle: idle.idle(callback, self, topic, value) if listener.expectsArguments: args = (self, topic, value)
else: callback( self, topic, value) else: args = ()
if listener.runOnIdle: idle.idle(callback, *args)
else: callback( *args)
def __getListeners(self, topic): def __getListeners(self, topic):
......
...@@ -23,6 +23,8 @@ paths. ...@@ -23,6 +23,8 @@ paths.
removeDuplicates removeDuplicates
uniquePrefix uniquePrefix
commonBase commonBase
wslpath
winpath
""" """
...@@ -30,13 +32,19 @@ import os.path as op ...@@ -30,13 +32,19 @@ import os.path as op
import os import os
import glob import glob
import operator import operator
import pathlib
import re
from typing import Sequence, Tuple, Union
PathLike = Union[str, pathlib.Path]
class PathError(Exception): class PathError(Exception):
"""``Exception`` class raised by the functions defined in this module """``Exception`` class raised by the functions defined in this module
when something goes wrong. when something goes wrong.
""" """
pass
def deepest(path, suffixes): def deepest(path, suffixes):
...@@ -47,12 +55,12 @@ def deepest(path, suffixes): ...@@ -47,12 +55,12 @@ def deepest(path, suffixes):
path = path.strip() path = path.strip()
if path == op.sep or path == '': if path in (op.sep, ''):
return None return None
path = path.rstrip(op.sep) path = path.rstrip(op.sep)
if any([path.endswith(s) for s in suffixes]): if any(path.endswith(s) for s in suffixes):
return path return path
return deepest(op.dirname(path), suffixes) return deepest(op.dirname(path), suffixes)
...@@ -76,7 +84,7 @@ def shallowest(path, suffixes): ...@@ -76,7 +84,7 @@ def shallowest(path, suffixes):
if parent is not None: if parent is not None:
return parent return parent
if any([path.endswith(s) for s in suffixes]): if any(path.endswith(s) for s in suffixes):
return path return path
return None return None
...@@ -96,19 +104,23 @@ def allFiles(root): ...@@ -96,19 +104,23 @@ def allFiles(root):
return files return files
def hasExt(path, allowedExts): def hasExt(path : PathLike,
allowedExts : Sequence[str]) -> bool:
"""Convenience function which returns ``True`` if the given ``path`` """Convenience function which returns ``True`` if the given ``path``
ends with any of the given ``allowedExts``, ``False`` otherwise. ends with any of the given ``allowedExts``, ``False`` otherwise.
""" """
return any([path.endswith(e) for e in allowedExts]) path = str(path)
return any(path.endswith(e) for e in allowedExts)
def addExt(prefix,
allowedExts=None, def addExt(
mustExist=True, prefix : PathLike,
defaultExt=None, allowedExts : Sequence[str] = None,
fileGroups=None, mustExist : bool = True,
unambiguous=True): defaultExt : str = None,
fileGroups : Sequence[Sequence[str]] = None,
unambiguous : bool = True
) -> Union[Sequence[str], str]:
"""Adds a file extension to the given file ``prefix``. """Adds a file extension to the given file ``prefix``.
If ``mustExist`` is False, and the file does not already have a If ``mustExist`` is False, and the file does not already have a
...@@ -143,6 +155,8 @@ def addExt(prefix, ...@@ -143,6 +155,8 @@ def addExt(prefix,
containing *all* matching files is returned. containing *all* matching files is returned.
""" """
prefix = str(prefix)
if allowedExts is None: allowedExts = [] if allowedExts is None: allowedExts = []
if fileGroups is None: fileGroups = {} if fileGroups is None: fileGroups = {}
...@@ -184,7 +198,8 @@ def addExt(prefix, ...@@ -184,7 +198,8 @@ def addExt(prefix,
# If ambiguity is ok, return # If ambiguity is ok, return
# all matching paths # all matching paths
elif not unambiguous: if not unambiguous:
return allPaths return allPaths
# Ambiguity is not ok! More than # Ambiguity is not ok! More than
...@@ -218,37 +233,73 @@ def addExt(prefix, ...@@ -218,37 +233,73 @@ def addExt(prefix,
return allPaths[0] return allPaths[0]
def removeExt(filename, allowedExts=None): def removeExt(
filename : PathLike,
allowedExts : Sequence[str] = None,
firstDot : bool = False
) -> str:
"""Returns the base name of the given file name. See :func:`splitExt`. """ """Returns the base name of the given file name. See :func:`splitExt`. """
return splitExt(filename, allowedExts, firstDot)[0]
return splitExt(filename, allowedExts)[0]
def getExt(
def getExt(filename, allowedExts=None): filename : PathLike,
allowedExts : Sequence[str] = None,
firstDot : bool = False
) -> str:
"""Returns the extension of the given file name. See :func:`splitExt`. """ """Returns the extension of the given file name. See :func:`splitExt`. """
return splitExt(filename, allowedExts, firstDot)[1]
return splitExt(filename, allowedExts)[1]
def splitExt(filename, allowedExts=None): def splitExt(
filename : PathLike,
allowedExts : Sequence[str] = None,
firstDot : bool = False
) -> Tuple[str, str]:
"""Returns the base name and the extension from the given file name. """Returns the base name and the extension from the given file name.
If ``allowedExts`` is ``None``, this function is equivalent to using:: If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this
function is equivalent to using::
os.path.splitext(filename) os.path.splitext(filename)
If ``allowedExts`` is provided, but the file does not end with an allowed If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file
extension, a tuple containing ``(filename, '')`` is returned. name is split on the first period that is found, rather than the last
period. For example::
splitExt('image.nii.gz') # -> ('image.nii', '.gz')
splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz')
If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if
the file does not end with an allowed extension, a tuple containing
``(filename, '')`` is returned.
:arg filename: The file name to split. :arg filename: The file name to split.
:arg allowedExts: Allowed/recognised file extensions. :arg allowedExts: Allowed/recognised file extensions.
:arg firstDot: Split the file name on the first period, rather than the
last period. Ignored if ``allowedExts`` is specified.
""" """
# If allowedExts is not specified, filename = str(filename)
# we just use op.splitext
# If allowedExts is not specified
# we split on a period character
if allowedExts is None: if allowedExts is None:
return op.splitext(filename)
# split on last period - equivalent
# to op.splitext
if not firstDot:
return op.splitext(filename)
# split on first period
else:
idx = filename.find('.')
if idx == -1:
return filename, ''
else:
return filename[:idx], filename[idx:]
# Otherwise, try and find a suffix match # Otherwise, try and find a suffix match
extMatches = [filename.endswith(ext) for ext in allowedExts] extMatches = [filename.endswith(ext) for ext in allowedExts]
...@@ -436,7 +487,7 @@ def removeDuplicates(paths, allowedExts=None, fileGroups=None): ...@@ -436,7 +487,7 @@ def removeDuplicates(paths, allowedExts=None, fileGroups=None):
groupFiles = getFileGroup(path, allowedExts, fileGroups) groupFiles = getFileGroup(path, allowedExts, fileGroups)
if not any([p in unique for p in groupFiles]): if not any(p in unique for p in groupFiles):
unique.append(groupFiles[0]) unique.append(groupFiles[0])
return unique return unique
...@@ -463,14 +514,13 @@ def uniquePrefix(path): ...@@ -463,14 +514,13 @@ def uniquePrefix(path):
break break
# Should never happen if path is valid # Should never happen if path is valid
elif len(hits) == 0 or idx >= len(filename) - 1: if len(hits) == 0 or idx >= len(filename) - 1:
raise PathError('No unique prefix for {}'.format(filename)) raise PathError('No unique prefix for {}'.format(filename))
# Not unique - continue looping # Not unique - continue looping
else: idx += 1
idx += 1 prefix = prefix + filename[idx]
prefix = prefix + filename[idx] hits = [h for h in hits if h.startswith(prefix)]
hits = [h for h in hits if h.startswith(prefix)]
return prefix return prefix
...@@ -496,7 +546,66 @@ def commonBase(paths): ...@@ -496,7 +546,66 @@ def commonBase(paths):
last = base last = base
if all([p.startswith(base) for p in paths]): if all(p.startswith(base) for p in paths):
return base return base
raise PathError('No common base') raise PathError('No common base')
def wslpath(path):
"""Convert Windows path (or a command line argument containing a Windows
path) to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``).
Also supports paths in the form ``\\wsl$\\(distro)\\users\\...``
:param winpath: Command line argument which may (or may not) contain a
Windows path. It is assumed to be either of the form
<windows path> or --<arg>=<windows path>. Note that we
don't need to handle --arg <windows path> or -a <windows
path> since in these cases the argument and the path will
be parsed as separate entities.
:return: If ``winpath`` matches a Windows path, the converted
argument (including the --<arg>= portion). Otherwise
returns ``winpath`` unchanged.
"""
match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", path)
if match:
arg, path = match.group(1, 2)
if arg is None:
arg = ""
return arg + path.replace("\\", "/")
match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", path)
if match:
arg, drive, path = match.group(1, 2, 3)
if arg is None:
arg = ""
return arg + "/mnt/" + drive.lower() + path.replace("\\", "/")
return path
def winpath(path):
"""Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a
path that can be used from Windows.
If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified
Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL.
This requires WSL2 which supports the ``\\wsl$\\`` network path.
wslpath is assumed to be an absolute path.
"""
from fsl.utils.platform import platform # pylint: disable=import-outside-toplevel # noqa: E501
if not platform.fslwsl:
return path
else:
match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir)
if match:
distro = match.group(1)
else:
distro = None
if not distro:
raise RuntimeError('Could not identify WSL installation from '
'FSLDIR (%s)' % platform.fsldir)
return "\\\\wsl$\\" + distro + path.replace("/", "\\")
...@@ -18,7 +18,8 @@ import os.path as op ...@@ -18,7 +18,8 @@ import os.path as op
import sys import sys
import importlib import importlib
import fsl.utils.notifier as notifier import fsl.utils.notifier as notifier
import fsl.utils.deprecated as deprecated
# An annoying consequence of using # An annoying consequence of using
# a system-module name for our own # a system-module name for our own
...@@ -87,6 +88,7 @@ class Platform(notifier.Notifier): ...@@ -87,6 +88,7 @@ class Platform(notifier.Notifier):
frozen frozen
fsldir fsldir
fsldevdir fsldevdir
fslVersion
haveGui haveGui
canHaveGui canHaveGui
inSSHSession inSSHSession
...@@ -110,26 +112,15 @@ class Platform(notifier.Notifier): ...@@ -110,26 +112,15 @@ class Platform(notifier.Notifier):
self.WX_MAC_CARBON = WX_MAC_CARBON self.WX_MAC_CARBON = WX_MAC_CARBON
self.WX_GTK = WX_GTK self.WX_GTK = WX_GTK
self.__inSSHSession = False # initialise fsldir - see fsldir.setter
self.__inVNCSession = False self.fsldir = self.fsldir
# These are all initialised on first access
self.__glVersion = None self.__glVersion = None
self.__glRenderer = None self.__glRenderer = None
self.__glIsSoftware = None self.__glIsSoftware = None
self.__fslVersion = None self.__fslVersion = None
self.__canHaveGui = None
# initialise fsldir - see fsldir.setter
self.fsldir = self.fsldir
# Determine if a display is available. We do
# this once at init (instead of on-demand in
# the canHaveGui method) because calling the
# IsDisplayAvailable function will cause the
# application to steal focus under OSX!
try:
import wx
self.__canHaveGui = wx.App.IsDisplayAvailable()
except ImportError:
self.__canHaveGui = False
# If one of the SSH_/VNC environment # If one of the SSH_/VNC environment
# variables is set, then we're probably # variables is set, then we're probably
...@@ -150,6 +141,10 @@ class Platform(notifier.Notifier): ...@@ -150,6 +141,10 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def frozen(self): def frozen(self):
"""``True`` if we are running in a compiled/frozen application, """``True`` if we are running in a compiled/frozen application,
``False`` otherwise. ``False`` otherwise.
...@@ -158,6 +153,10 @@ class Platform(notifier.Notifier): ...@@ -158,6 +153,10 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def haveGui(self): def haveGui(self):
"""``True`` if we are running with a GUI, ``False`` otherwise. """``True`` if we are running with a GUI, ``False`` otherwise.
...@@ -168,7 +167,7 @@ class Platform(notifier.Notifier): ...@@ -168,7 +167,7 @@ class Platform(notifier.Notifier):
the event loop is called periodically, and so is not always running. the event loop is called periodically, and so is not always running.
""" """
try: try:
import wx import wx # pylint: disable=import-outside-toplevel
app = wx.GetApp() app = wx.GetApp()
# TODO Previously this conditional # TODO Previously this conditional
...@@ -201,12 +200,31 @@ class Platform(notifier.Notifier): ...@@ -201,12 +200,31 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def canHaveGui(self): def canHaveGui(self):
"""``True`` if it is possible to create a GUI, ``False`` otherwise. """ """``True`` if it is possible to create a GUI, ``False`` otherwise. """
# Determine if a display is available. Note that
# calling the IsDisplayAvailable function will
# cause the application to steal focus under OSX!
if self.__canHaveGui is None:
try:
import wx # pylint: disable=import-outside-toplevel
self.__canHaveGui = wx.App.IsDisplayAvailable()
except ImportError:
self.__canHaveGui = False
return self.__canHaveGui return self.__canHaveGui
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def inSSHSession(self): def inSSHSession(self):
"""``True`` if this application is running over an SSH session, """``True`` if this application is running over an SSH session,
``False`` otherwise. ``False`` otherwise.
...@@ -215,6 +233,10 @@ class Platform(notifier.Notifier): ...@@ -215,6 +233,10 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def inVNCSession(self): def inVNCSession(self):
"""``True`` if this application is running over a VNC (or similar) """``True`` if this application is running over a VNC (or similar)
session, ``False`` otherwise. Currently, the following remote desktop session, ``False`` otherwise. Currently, the following remote desktop
...@@ -228,6 +250,10 @@ class Platform(notifier.Notifier): ...@@ -228,6 +250,10 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def wxPlatform(self): def wxPlatform(self):
"""One of :data:`WX_UNKNOWN`, :data:`WX_MAC_COCOA`, """One of :data:`WX_UNKNOWN`, :data:`WX_MAC_COCOA`,
:data:`WX_MAC_CARBON`, or :data:`WX_GTK`, indicating the wx platform. :data:`WX_MAC_CARBON`, or :data:`WX_GTK`, indicating the wx platform.
...@@ -236,14 +262,14 @@ class Platform(notifier.Notifier): ...@@ -236,14 +262,14 @@ class Platform(notifier.Notifier):
if not self.canHaveGui: if not self.canHaveGui:
return WX_UNKNOWN return WX_UNKNOWN
import wx import wx # pylint: disable=import-outside-toplevel
pi = [t.lower() for t in wx.PlatformInfo] pi = [t.lower() for t in wx.PlatformInfo]
if any(['cocoa' in p for p in pi]): plat = WX_MAC_COCOA if any('cocoa' in p for p in pi): plat = WX_MAC_COCOA
elif any(['carbon' in p for p in pi]): plat = WX_MAC_CARBON elif any('carbon' in p for p in pi): plat = WX_MAC_CARBON
elif any(['gtk' in p for p in pi]): plat = WX_GTK elif any('gtk' in p for p in pi): plat = WX_GTK
else: plat = WX_UNKNOWN else: plat = WX_UNKNOWN
if plat is WX_UNKNOWN: if plat is WX_UNKNOWN:
log.warning('Could not determine wx platform from ' log.warning('Could not determine wx platform from '
...@@ -253,6 +279,10 @@ class Platform(notifier.Notifier): ...@@ -253,6 +279,10 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def wxFlavour(self): def wxFlavour(self):
"""One of :data:`WX_UNKNOWN`, :data:`WX_PYTHON` or :data:`WX_PHOENIX`, """One of :data:`WX_UNKNOWN`, :data:`WX_PYTHON` or :data:`WX_PHOENIX`,
indicating the wx flavour. indicating the wx flavour.
...@@ -261,7 +291,7 @@ class Platform(notifier.Notifier): ...@@ -261,7 +291,7 @@ class Platform(notifier.Notifier):
if not self.canHaveGui: if not self.canHaveGui:
return WX_UNKNOWN return WX_UNKNOWN
import wx import wx # pylint: disable=import-outside-toplevel
pi = [t.lower() for t in wx.PlatformInfo] pi = [t.lower() for t in wx.PlatformInfo]
isPhoenix = False isPhoenix = False
...@@ -292,6 +322,23 @@ class Platform(notifier.Notifier): ...@@ -292,6 +322,23 @@ class Platform(notifier.Notifier):
return os.environ.get('FSLDEVDIR', None) return os.environ.get('FSLDEVDIR', None)
@property
def wsl(self):
"""Boolean flag indicating whether we are running under Windows
Subsystem for Linux.
"""
plat = builtin_platform.platform().lower()
return 'microsoft' in plat
@property
def fslwsl(self):
"""Boolean flag indicating whether FSL is installed in Windows
Subsystem for Linux
"""
return self.fsldir is not None and self.fsldir.startswith("\\\\wsl$")
@fsldir.setter @fsldir.setter
def fsldir(self, value): def fsldir(self, value):
"""Changes the value of the :attr:`fsldir` property, and notifies any """Changes the value of the :attr:`fsldir` property, and notifies any
...@@ -308,18 +355,13 @@ class Platform(notifier.Notifier): ...@@ -308,18 +355,13 @@ class Platform(notifier.Notifier):
if value is None: if value is None:
os.environ.pop('FSLDIR', None) os.environ.pop('FSLDIR', None)
else: else:
os.environ['FSLDIR'] = value os.environ['FSLDIR'] = value
# Set the FSL version field if we can # clear fslversion - it will
versionFile = op.join(value, 'etc', 'fslversion') # be re-read on next access
self.__fslVersion = None
if op.exists(versionFile):
with open(versionFile, 'rt') as f:
# split string at colon for new hash style versions
# first object in list is the non-hashed version string (e.g. 6.0.2)
# if no ":hash:" then standard FSL version string is still returned
self.__fslVersion = f.read().strip().split(":")[0]
self.notify(value=value) self.notify(value=value)
...@@ -349,10 +391,31 @@ class Platform(notifier.Notifier): ...@@ -349,10 +391,31 @@ class Platform(notifier.Notifier):
"""Returns the FSL version as a string, e.g. ``'5.0.9'``. Returns """Returns the FSL version as a string, e.g. ``'5.0.9'``. Returns
``None`` if a FSL installation could not be found. ``None`` if a FSL installation could not be found.
""" """
if self.__fslVersion is not None:
return self.__fslVersion
if self.fsldir is None:
return None
# Set the FSL version field if we can
versionFile = op.join(self.fsldir, 'etc', 'fslversion')
if op.exists(versionFile):
with open(versionFile, 'rt') as f:
# split string at colon for new hash style versions
# first object in list is the non-hashed version string
# (e.g. 6.0.2) if no ":hash:" then standard FSL version
# string is still returned
self.__fslVersion = f.read().strip().split(":")[0]
return self.__fslVersion return self.__fslVersion
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glVersion(self): def glVersion(self):
"""Returns the available OpenGL version, or ``None`` if it has not """Returns the available OpenGL version, or ``None`` if it has not
been set. been set.
...@@ -361,12 +424,20 @@ class Platform(notifier.Notifier): ...@@ -361,12 +424,20 @@ class Platform(notifier.Notifier):
@glVersion.setter @glVersion.setter
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glVersion(self, value): def glVersion(self, value):
"""Set the available OpenGL version. """ """Set the available OpenGL version. """
self.__glVersion = value self.__glVersion = value
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glRenderer(self): def glRenderer(self):
"""Returns the available OpenGL renderer, or ``None`` if it has not """Returns the available OpenGL renderer, or ``None`` if it has not
been set. been set.
...@@ -375,6 +446,10 @@ class Platform(notifier.Notifier): ...@@ -375,6 +446,10 @@ class Platform(notifier.Notifier):
@glRenderer.setter @glRenderer.setter
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glRenderer(self, value): def glRenderer(self, value):
"""Set the available OpenGL renderer. """ """Set the available OpenGL renderer. """
self.__glRenderer = value self.__glRenderer = value
...@@ -392,6 +467,10 @@ class Platform(notifier.Notifier): ...@@ -392,6 +467,10 @@ class Platform(notifier.Notifier):
@property @property
@deprecated.deprecated(
'3.6.0',
'4.0.0',
'Equivalent functionality is available in fsleyes-widgets.')
def glIsSoftwareRenderer(self): def glIsSoftwareRenderer(self):
"""Returns ``True`` if the OpenGL renderer is software based, """Returns ``True`` if the OpenGL renderer is software based,
``False`` otherwise, or ``None`` if the renderer has not yet been set. ``False`` otherwise, or ``None`` if the renderer has not yet been set.
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
# run.py - Functions for running shell commands # run.py - Functions for running shell commands
# #
# Author: Paul McCarthy <pauldmccarthy@gmail.com> # Author: Paul McCarthy <pauldmccarthy@gmail.com>
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
# #
"""This module provides some functions for running shell commands. """This module provides some functions for running shell commands.
...@@ -14,24 +15,34 @@ ...@@ -14,24 +15,34 @@
run run
runfsl runfsl
wait runfunc
func_to_cmd
dryrun dryrun
hold
job_output
""" """
import sys import io
import logging import sys
import threading import glob
import contextlib import time
import collections import shlex
import subprocess as sp import logging
import os.path as op import tempfile
import threading
import contextlib
import collections.abc as abc
import subprocess as sp
import os.path as op
import os
import textwrap as tw
import six import dill
from fsl.utils.platform import platform as fslplatform from fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub as fslsub
import fsl.utils.tempdir as tempdir import fsl.utils.tempdir as tempdir
import fsl.utils.path as fslpath
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -42,6 +53,12 @@ DRY_RUN = False ...@@ -42,6 +53,12 @@ DRY_RUN = False
execute them. execute them.
""" """
DRY_RUN_COMMANDS = None
"""Contains the commands that got logged during a dry run.
Commands will be logged if :data:`DRY_RUN` is true, which can be set using :func:`dryrun`.
"""
FSL_PREFIX = None FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """ """Global override for the FSL executable location used by :func:`runfsl`. """
...@@ -51,20 +68,25 @@ class FSLNotPresent(Exception): ...@@ -51,20 +68,25 @@ class FSLNotPresent(Exception):
"""Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
be found. be found.
""" """
pass
@contextlib.contextmanager @contextlib.contextmanager
def dryrun(*args): def dryrun(*_):
"""Context manager which causes all calls to :func:`run` to be logged but """Context manager which causes all calls to :func:`run` to be logged but
not executed. See the :data:`DRY_RUN` flag. not executed. See the :data:`DRY_RUN` flag.
The returned standard output will be equal to ``' '.join(args)``. The returned standard output will be equal to ``' '.join(args)``.
After this function returns, each command that was executed while the
dryrun is active, along with any submission parameters, will be accessible
within a list which is stored as :data:`DRY_RUN_COMMANDS`.
""" """
global DRY_RUN global DRY_RUN, DRY_RUN_COMMANDS # pylint: disable=global-statement
oldval = DRY_RUN
DRY_RUN = True
DRY_RUN_COMMANDS = []
oldval = DRY_RUN
DRY_RUN = True
try: try:
yield yield
...@@ -80,8 +102,8 @@ def prepareArgs(args): ...@@ -80,8 +102,8 @@ def prepareArgs(args):
if len(args) == 1: if len(args) == 1:
# Argument was a command string # Argument was a command string
if isinstance(args[0], six.string_types): if isinstance(args[0], str):
args = args[0].split() args = shlex.split(args[0])
# Argument was an unpacked sequence # Argument was an unpacked sequence
else: else:
...@@ -90,7 +112,6 @@ def prepareArgs(args): ...@@ -90,7 +112,6 @@ def prepareArgs(args):
return list(args) return list(args)
real_stdout = sys.stdout
def _forwardStream(in_, *outs): def _forwardStream(in_, *outs):
"""Creates and starts a daemon thread which forwards the given input stream """Creates and starts a daemon thread which forwards the given input stream
to one or more output streams. Used by the :func:`run` function to redirect to one or more output streams. Used by the :func:`run` function to redirect
...@@ -145,68 +166,93 @@ def run(*args, **kwargs): ...@@ -145,68 +166,93 @@ def run(*args, **kwargs):
:arg submit: Must be passed as a keyword argument. Defaults to ``None``. :arg submit: Must be passed as a keyword argument. Defaults to ``None``.
If ``True``, the command is submitted as a cluster job via If ``True``, the command is submitted as a cluster job via
the :func:`.fslsub.submit` function. May also be a the :mod:`fsl.wrappers.fsl_sub` function. May also be a
dictionary containing arguments to that function. dictionary containing arguments to that function.
:arg log: Must be passed as a keyword argument. An optional ``dict`` :arg cmdonly: Defaults to ``False``. If ``True``, the command is not
which may be used to redirect the command's standard output executed, but rather is returned directly, as a list of
and error. The following keys are recognised: arguments.
:arg log: Must be passed as a keyword argument. Defaults to
``{'tee' : True}``. An optional ``dict`` which may be used
to redirect the command's standard output and error. Ignored
if ``submit`` is specified. The following keys are
recognised:
- tee: If ``True`` (the default), the command's
standard output/error streams are forwarded to
the output streams of this process, in addition
to being captured and returned.
- stdout: Optional callable or file-like object to which
the command's standard output stream can be
forwarded.
- tee: If ``True``, the command's standard output/error - stderr: Optional callable or file-like object to which
streams are forwarded to this processes streams. the command's standard error stream can be
forwarded.
- stdout: Optional file-like object to which the command's - cmd: Optional callable or file-like object to which
standard output stream can be forwarded. the command itself is logged.
- stderr: Optional file-like object to which the command's :arg silent: Suppress standard output/error. Equivalent to passing
standard error stream can be forwarded. ``log={'tee' : False}``. Ignored if `log` is also passed.
- cmd: Optional file-like object to which the command All other keyword arguments are passed through to the ``subprocess.Popen``
itself is logged. object (via :func:`_realrun`), unless ``submit=True``, in which case they
are passed through to the :func:`.fsl_sub` function.
:returns: If ``submit`` is provided, the return value of :returns: If ``submit`` is provided, the ID of the submitted job is
:func:`.fslsub` is returned. Otherwise returns a single returned as a string. Otherwise returns a single value or a
value or a tuple, based on the based on the ``stdout``, tuple, based on the based on the ``stdout``, ``stderr``, and
``stderr``, and ``exitcode`` arguments. ``exitcode`` arguments.
""" """
returnStdout = kwargs.get('stdout', True) returnStdout = kwargs.pop('stdout', True)
returnStderr = kwargs.get('stderr', False) returnStderr = kwargs.pop('stderr', False)
returnExitcode = kwargs.get('exitcode', False) returnExitcode = kwargs.pop('exitcode', False)
submit = kwargs.get('submit', {}) submit = kwargs.pop('submit', {})
log = kwargs.get('log', {}) cmdonly = kwargs.pop('cmdonly', False)
tee = log .get('tee', False) logg = kwargs.pop('log', None)
logStdout = log .get('stdout', None) silent = kwargs.pop('silent', False)
logStderr = log .get('stderr', None)
logCmd = log .get('cmd', None)
args = prepareArgs(args) args = prepareArgs(args)
if logg is None:
logg = {'tee' : not silent}
tee = logg.get('tee', True)
logStdout = logg.get('stdout', None)
logStderr = logg.get('stderr', None)
logCmd = logg.get('cmd', None)
if not bool(submit): if not bool(submit):
submit = None submit = None
if submit is not None: if submit is not None:
returnStdout = False
returnStderr = False
returnExitcode = False
if submit is True: if submit is True:
submit = dict() submit = dict()
if submit is not None and not isinstance(submit, collections.Mapping): if submit is not None and not isinstance(submit, abc.Mapping):
raise ValueError('submit must be a mapping containing ' raise ValueError('submit must be a mapping containing '
'options for fsl.utils.fslsub.submit') 'options for fsl.utils.fslsub.submit')
if cmdonly:
return args
if DRY_RUN: if DRY_RUN:
return _dryrun( return _dryrun(
submit, returnStdout, returnStderr, returnExitcode, *args) submit, returnStdout, returnStderr, returnExitcode, *args)
# submit - delegate to fslsub # submit - delegate to fsl_sub. This will induce a nested
# call back to this run function, which is a bit confusing,
# but harmless, as we've popped the "submit" arg above.
if submit is not None: if submit is not None:
return fslsub.submit(' '.join(args), **submit) from fsl.wrappers import fsl_sub # pylint: disable=import-outside-toplevel # noqa: E501
return fsl_sub(*args, log=logg, **submit, **kwargs)[0].strip()
# Run directly - delegate to _realrun # Run directly - delegate to _realrun
stdout, stderr, exitcode = _realrun( stdout, stderr, exitcode = _realrun(
tee, logStdout, logStderr, logCmd, *args) tee, logStdout, logStderr, logCmd, *args, **kwargs)
if not returnExitcode and (exitcode != 0): if not returnExitcode and (exitcode != 0):
raise RuntimeError('{} returned non-zero exit code: {}'.format( raise RuntimeError('{} returned non-zero exit code: {}'.format(
...@@ -221,18 +267,23 @@ def run(*args, **kwargs): ...@@ -221,18 +267,23 @@ def run(*args, **kwargs):
else: return tuple(results) else: return tuple(results)
def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args): def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
"""Used by the :func:`run` function when the :attr:`DRY_RUN` flag is """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
active. active.
""" """
# Save command/submit parameters -
# see the dryrun ctx manager
if DRY_RUN_COMMANDS is not None:
DRY_RUN_COMMANDS.append((args, submit))
if submit: if submit:
return ('0',) return ('0',)
results = [] results = []
stderr = '' stderr = ''
stdout = ' '.join(args) join = getattr(shlex, 'join', ' '.join)
stdout = join(args)
if returnStdout: results.append(stdout) if returnStdout: results.append(stdout)
if returnStderr: results.append(stderr) if returnStderr: results.append(stderr)
...@@ -242,7 +293,7 @@ def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args): ...@@ -242,7 +293,7 @@ def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
else: return tuple(results) else: return tuple(results)
def _realrun(tee, logStdout, logStderr, logCmd, *args): def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
"""Used by :func:`run`. Runs the given command and manages its standard """Used by :func:`run`. Runs the given command and manages its standard
output and error streams. output and error streams.
...@@ -250,23 +301,31 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args): ...@@ -250,23 +301,31 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args):
streams are forwarded to this process' standard output/ streams are forwarded to this process' standard output/
error. error.
:arg logStdout: Optional file-like object to which the command's standard :arg logStdout: Optional callable or file-like object to which the
output stream can be forwarded. command's standard output stream can be forwarded.
:arg logStderr: Optional file-like object to which the command's standard :arg logStderr: Optional callable or file-like object to which the
error stream can be forwarded. command's standard error stream can be forwarded.
:arg logCmd: Optional file-like object to which the command itself is :arg logCmd: Optional callable or file-like to which the command
logged. itself is logged.
:arg args: Command to run :arg args: Command to run
:arg kwargs: Passed through to the ``subprocess.Popen`` object.
:returns: A tuple containing: :returns: A tuple containing:
- the command's standard output as a string. - the command's standard output as a string.
- the command's standard error as a string. - the command's standard error as a string.
- the command's exit code. - the command's exit code.
""" """
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE) if fslplatform.fslwsl:
# On Windows this prevents opening of a popup window
startupinfo = sp.STARTUPINFO()
startupinfo.dwFlags |= sp.STARTF_USESHOWWINDOW
kwargs["startupinfo"] = startupinfo
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
with tempdir.tempdir(changeto=False) as td: with tempdir.tempdir(changeto=False) as td:
# We always direct the command's stdout/ # We always direct the command's stdout/
...@@ -287,14 +346,22 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args): ...@@ -287,14 +346,22 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args):
outstreams.append(sys.stdout) outstreams.append(sys.stdout)
errstreams.append(sys.stderr) errstreams.append(sys.stderr)
# And we also duplicate to caller- # And we also duplicate to caller-provided
# provided streams if they're given. # streams if they are file-likes (if they're
if logStdout is not None: outstreams.append(logStdout) # callables, we call them after the process
if logStderr is not None: errstreams.append(logStderr) # has completed)
if logStdout is not None and not callable(logStdout):
# log the command if requested outstreams.append(logStdout)
if logCmd is not None: if logStderr is not None and not callable(logStderr):
cmd = ' '.join(args) + '\n' errstreams.append(logStderr)
# log the command if requested.
# logCmd can be a callable, or
# can be a file-like.
cmd = ' '.join(args) + '\n'
if callable(logCmd):
logCmd(cmd)
elif logCmd is not None:
if 'b' in getattr(logCmd, 'mode', 'w'): if 'b' in getattr(logCmd, 'mode', 'w'):
logCmd.write(cmd.encode('utf-8')) logCmd.write(cmd.encode('utf-8'))
else: else:
...@@ -318,6 +385,10 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args): ...@@ -318,6 +385,10 @@ def _realrun(tee, logStdout, logStderr, logCmd, *args):
stdout = stdout.decode('utf-8') stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8') stderr = stderr.decode('utf-8')
# Send stdout/error to logStdout/err callables
if logStdout is not None and callable(logStdout): logStdout(stdout)
if logStderr is not None and callable(logStderr): logStderr(stderr)
return stdout, stderr, exitcode return stdout, stderr, exitcode
...@@ -348,13 +419,290 @@ def runfsl(*args, **kwargs): ...@@ -348,13 +419,290 @@ def runfsl(*args, **kwargs):
args = prepareArgs(args) args = prepareArgs(args)
for prefix in prefixes: for prefix in prefixes:
cmdpath = op.join(prefix, args[0]) cmdpath = op.join(prefix, args[0])
if op.isfile(cmdpath): if fslplatform.fslwsl:
wslargs = wslcmd(cmdpath, *args)
if wslargs is not None:
args = wslargs
break
elif op.isfile(cmdpath):
args[0] = cmdpath args[0] = cmdpath
break break
# error if the command cannot
# be found in a FSL directory
else:
raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
args[0], ', '.join(prefixes)))
return run(*args, **kwargs) return run(*args, **kwargs)
def wait(job_ids): def runfunc(func,
"""Proxy for :func:`.fslsub.wait`. """ args=None,
return fslsub.wait(job_ids) kwargs=None,
tmp_dir=None,
clean="never",
verbose=False,
**run_kwargs):
"""Run the given python function as a shell command. See
:func:`func_to_cmd` for details on the arguments.
The remaining ``run_kwargs`` arguments are passed through to the
:func:`run` function.
"""
cmd = func_to_cmd(func, args, kwargs, tmp_dir, clean, verbose)
return run(cmd, **run_kwargs)
def func_to_cmd(func,
args=None,
kwargs=None,
tmp_dir=None,
clean="never",
verbose=False):
"""Save the given python function to an executable file. Return a string
containing a command that can be used to run the function.
..warning:: If submitting a function defined in the ``__main__`` script,
the script will be run again to retrieve this function. Make
sure there is a ``if __name__ == '__main__'`` guard to prevent
the full script from being re-run.
:arg func: function to be run
:arg args: positional arguments
:arg kwargs: keyword arguments
:arg tmp_dir: directory where to store the temporary file (default: the
system temporary directory)
:arg clean: Whether the script should be removed after running. There are
three options:
- ``"never"``: (default) Script is kept
- ``"on_success"``: only remove if script successfully
finished (i.e., no error is raised)
- ``"always"``: always remove the script, even if it
raises an error
:arg verbose: If set to True, the script will print its own filename
before running
"""
script_template = tw.dedent("""
#!{executable}
# This is a temporary file designed to run the python function {funcname},
# so that it can be submitted to the cluster
import os
import dill
from io import BytesIO
from importlib import import_module
if {verbose}:
print('running {filename}')
dill_bytes = BytesIO({dill_bytes})
func, args, kwargs = dill.load(dill_bytes)
clean = {clean}
try:
res = func(*args, **kwargs)
except Exception as e:
if clean == 'on_success':
clean = 'never'
raise e
finally:
if clean in ('on_success', 'always'):
os.remove({filename})
""").strip()
if clean not in ('never', 'always', 'on_success'):
raise ValueError("Clean should be one of 'never', 'always', "
f"or 'on_success', not {clean}")
if args is None: args = ()
if kwargs is None: kwargs = {}
dill_bytes = io.BytesIO()
dill.dump((func, args, kwargs), dill_bytes, recurse=True)
handle, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
suffix='.py',
dir=tmp_dir)
os.close(handle)
python_cmd = script_template.format(
executable=sys.executable,
funcname=func.__name__,
filename=f'"{filename}"',
verbose=verbose,
clean=f'"{clean}"',
dill_bytes=dill_bytes.getvalue())
with open(filename, 'w') as f:
f.write(python_cmd)
os.chmod(filename, 0o755)
return f'{filename}'
def wslcmd(cmdpath, *args):
"""Convert a command + arguments into an equivalent set of arguments that
will run the command under Windows Subsystem for Linux
:param cmdpath: Fully qualified path to the command. This is essentially
a WSL path not a Windows one since FSLDIR is specified
as a WSL path, however it may have backslashes as path
separators due to previous use of ``os.path.join``
:param args: Sequence of command arguments (the first of which is the
unqualified command name)
:return: If ``cmdpath`` exists and is executable in WSL, return a
sequence of command arguments which when executed will run the
command in WSL. Windows paths in the argument list will be
converted to WSL paths. If ``cmdpath`` was not executable in
WSL, returns None
"""
# Check if command exists in WSL (remembering
# that the command path may include FSLDIR
# which is a Windows path)
cmdpath = fslpath.wslpath(cmdpath)
_stdout, _stderr, retcode = _realrun(
False, None, None, None, "wsl", "test", "-x", cmdpath)
if retcode == 0:
# Form a new argument list and convert
# any Windows paths in it into WSL paths
wslargs = [fslpath.wslpath(arg) for arg in args]
wslargs[0] = cmdpath
local_fsldir = fslpath.wslpath(fslplatform.fsldir)
if fslplatform.fsldevdir:
local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
else:
local_fsldevdir = None
# Prepend important environment variables -
# note that it seems we cannot use WSLENV
# for this due to its insistance on path
# mapping. FIXME FSLDEVDIR?
local_path = "$PATH"
if local_fsldevdir:
local_path += ":%s/bin" % local_fsldevdir
local_path += ":%s/bin" % local_fsldir
prepargs = [
"wsl",
"PATH=%s" % local_path,
"FSLDIR=%s" % local_fsldir,
"FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
]
if local_fsldevdir:
prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
return prepargs + wslargs
else:
# Command was not found in WSL with this path
return None
def hold(job_ids, hold_filename=None, timeout=10):
"""Waits until all specified cluster jobs have finished.
:arg job_ids: Possibly nested sequence of job ids. The job ids
themselves should be strings.
:arg hold_filename: Filename to use as a hold file. The containing
directory should exist, but the file itself should
not. Defaults to a ./.<random characters>.hold in
the current directory.
:arg timeout: Number of seconds to sleep between status checks.
"""
# Returns a potentially nested sequence of
# job ids as a single comma-separated string
def _flatten_job_ids(job_ids):
def unpack(job_ids):
if isinstance(job_ids, str):
return {job_ids}
elif isinstance(job_ids, int):
return {str(job_ids)}
else:
res = set()
for job_id in job_ids:
res.update(unpack(job_id))
return res
return ','.join(sorted(unpack(job_ids)))
if hold_filename is not None:
if op.exists(hold_filename):
raise IOError(f"Hold file ({hold_filename}) already exists")
elif not op.isdir(op.split(op.abspath(hold_filename))[0]):
raise IOError(f"Hold file ({hold_filename}) can not be created "
"in non-existent directory")
# Generate a random file name to use as
# the hold file. Reduce likelihood of
# naming collision by storing file in
# cwd.
if hold_filename is None:
handle, hold_filename = tempfile.mkstemp(prefix='.',
suffix='.hold',
dir='.')
os.remove(hold_filename)
os.close(handle)
submit = {
'jobhold' : _flatten_job_ids(job_ids),
'jobtime' : 1,
'name' : '.hold',
}
run(f'touch {hold_filename}', submit=submit, silent=True)
while not op.exists(hold_filename):
time.sleep(timeout)
# remove the hold file and the
# fsl_sub job stdout/err files
os.remove(hold_filename)
for outfile in glob.glob('.hold.[o,e]*'):
os.remove(outfile)
def job_output(job_id, logdir='.', command=None, name=None):
"""Returns the output of the given cluster-submitted job.
On SGE cluster systems, the standard output and error streams of a
submitted job are saved to files named ``<job_id>.o`` and ``<job_id>.e``.
This function simply reads those files and returns their content.
:arg job_id: String containing job ID.
:arg logdir: Directory containing the log - defaults to
the current directory.
:arg command: Command that was run. Not currently used.
:arg name: Job name if it was specified. Not currently used.
:returns: A tuple containing the standard output and standard error.
"""
stdout = list(glob.glob(op.join(logdir, f'*.o{job_id}')))
stderr = list(glob.glob(op.join(logdir, f'*.e{job_id}')))
if len(stdout) != 1 or len(stderr) != 1:
raise ValueError('No/too many error/output files for job '
f'{job_id}: stdout: {stdout}, stderr: {stderr}')
stdout = stdout[0]
stderr = stderr[0]
if op.exists(stdout):
with open(stdout, 'rt') as f:
stdout = f.read()
else:
stdout = None
if op.exists(stderr):
with open(stderr, 'rt') as f:
stderr = f.read()
else:
stderr = None
return stdout, stderr