Commit d30bfc33 authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

Merge branch 'enh/fslsub' into 'master'


See merge request fsl/fslpy!215
parents dd48efe0 dfa51e0f
......@@ -157,12 +157,6 @@ variables:
- bash ./.ci/
stage: test
image: pauldmccarthy/fsleyes-py36-wxpy4-gtk3
<<: *test_template
stage: test
image: pauldmccarthy/fsleyes-py37-wxpy4-gtk3
......@@ -2,7 +2,7 @@ This document contains the ``fslpy`` release history in reverse chronological
2.9.0 (Under development)
3.0.0 (Under development)
......@@ -17,12 +17,15 @@ Added
rudimentary support for double-barrelled filenames.
* The :func:`.nonlinear.applyDeformation` function now accepts a ``premat``
affine, which is applied to the input image before the deformation field.
* New :class:`.SubmitParams` class, providing a higer level interface for
cluster submission.
* ``fslpy`` now requires a minimum Python version of 3.7.
* The :func:`.gifti.relatedFiles` function now supports files with
BIDS-style naming conventions.
* The :func:`` and :func:`.run.runfsl` functions now pass through any
......@@ -49,27 +49,172 @@ import sys
import tempfile
import logging
import importlib
from dataclasses import dataclass, asdict
from typing import Optional, Collection, Union, Tuple
import argparse
import warnings
log = logging.getLogger(__name__)
def submit(*command,
class SubmitParams(object):
Represents the fsl_sub parameters
minutes: Optional[float] = None
queue: Optional[str] = None
architecture: Optional[str] = None
priority: Optional[int] = None
email: Optional[str] = None
wait_for: Union[str, None, Collection[str]] = None
job_name: Optional[str] = None
ram: Optional[int] = None
logdir: Optional[str] = None
mail_options: Optional[str] = None
flags: bool = False
multi_threaded: Optional[Tuple[str, str]] = None
verbose: bool = False
env: dict = None
cmd_line_flags = {
'-T': 'minutes',
'-q': 'queue',
'-a': 'architecture',
'-p': 'priority',
'-M': 'email',
'-N': 'job_name',
'-R': 'ram',
'-l': 'logdir',
'-m': 'mail_options',
def __post_init__(self):
if self.env is None:
self.env = {}
def as_flags(self, ):
Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
:return: tuple with the flags
res = []
for key, value in self.cmd_line_flags.items():
if getattr(self, value) is not None:
res.extend((key, str(getattr(self, value))))
if self.verbose:
if self.flags:
if self.multi_threaded:
res.extend(("-s", ','.join(self.multi_threaded)))
if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
res.extend(('-j', _flatten_job_ids(self.wait_for)))
return tuple(res)
def __str__(self):
return 'SubmitParams({})'.format(" ".join(self.as_flags()))
def __call__(self, *command, **kwargs):
Submits the command to the cluster.
:param command: string or tuple of strings with the command to submit
:param kwargs: Keyword arguments can override any parameters set in self
:return: job ID
from import prepareArgs, runfsl
runner = self.update(**kwargs)
command = prepareArgs(command)
fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command))
jobid = runfsl(fsl_sub_cmd, env=runner.env).strip()
log.debug('Job submitted as {}'.format(jobid))
return jobid
def update(self, **kwargs):
Creates a new SubmitParams withe updated parameters
values = asdict(self)
return SubmitParams(**values)
def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands',
include=('wait_for', 'logdir', 'email', 'mail_options')):
Adds submission parameters to the parser
:param parser: parser that should understand submission commands
:param as_group: add as a new group
:param include: sequence of argument flags/names that should be added to the parser
(set to None to include everything)
:return: the group the arguments got added to
from import runfsl, FSLNotPresent
fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True)
except (FileNotFoundError, FSLNotPresent):
warnings.warn('fsl_sub was not found')
doc_lines = fsl_sub_run.splitlines()
nspaces = 1
for line in doc_lines:
if len(line.strip()) > 0:
while line.startswith(' ' * nspaces):
nspaces += 1
nspaces -= 1
if as_group:
group = parser.add_argument_group(as_group)
group = parser
def get_explanation(flag):
explanation = None
for line in doc_lines:
if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
elif explanation is not None:
elif line.strip().startswith(flag):
explanation = [line[nspaces:].strip()]
if (explanation is None) or (len(explanation) == 0):
return 'documentation not found'
return ' '.join(explanation)
for flag, value in cls.cmd_line_flags.items():
if include is not None and value not in include and flag not in include:
as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
action = 'store_true' if value == 'verbose' else 'store'
group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action,
metavar='<' + value + '>', type=as_type.get(value, str))
group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true')
group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true')
group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'),
group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'),
return group
def from_args(cls, args):
as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
if args._sub_wait_for is not None:
as_dict['wait_for'] = args._sub_wait_for.split(',')
if args._sub_multi_threaded is not None:
pename, threads = args._sub_multi_threaded.split(',')
as_dict['multi_threaded'] = pename, threads
return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict)
def submit(*command, **kwargs):
Submits a given command to the cluster
......@@ -104,41 +249,7 @@ def submit(*command,
:return: string of submitted job id
from import runfsl, prepareArgs
base_cmd = ['fsl_sub']
for flag, variable_name in [
('-T', 'minutes'),
('-q', 'queue'),
('-a', 'architecture'),
('-p', 'priority'),
('-M', 'email'),
('-N', 'job_name'),
('-R', 'ram'),
('-l', 'logdir'),
('-m', 'mail_options'),
('-z', 'output')]:
variable = locals()[variable_name]
if variable:
base_cmd.extend([flag, str(variable)])
if flags:
if verbose:
if wait_for:
base_cmd.extend(['-j', _flatten_job_ids(wait_for)])
if multi_threaded:
return runfsl(*base_cmd, env=env).strip()
return SubmitParams(**kwargs)(*command)
def info(job_id):
......@@ -264,7 +375,7 @@ else:
res = func(*args, **kwargs)
if res is not None:
with open(__file__ + '_out.pickle') as f:
with open(__file__ + '_out.pickle', 'w') as f:
pickle.dump(f, res)
......@@ -102,8 +102,6 @@ setup(
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Software Development :: Libraries :: Python Modules'],
......@@ -12,7 +12,9 @@ import os.path as op
import sys
import textwrap as tw
import contextlib
import argparse
import fsl
from fsl.utils import fslsub
from fsl.utils.tempdir import tempdir
......@@ -27,6 +29,13 @@ import os
import os.path as op
import sys
import subprocess as sp
fslpydir = op.join('{}', '..')
env = dict(os.environ)
env['PYTHONPATH'] = fslpydir
sys.path.insert(0, fslpydir)
import fsl
args = sys.argv[1:]
......@@ -44,9 +53,6 @@ for i in range(len(args)):
args = args[i:]
env = dict(os.environ)
env['PYTHONPATH'] = op.join(op.dirname(fsl.__file__), '..')
cmd = op.basename(args[0])
jobid = random.randint(1, 9999)
......@@ -56,7 +62,7 @@ with open('{{}}.o{{}}'.format(cmd, jobid), 'w') as stdout, \
""".format(sys.executable, op.dirname(fsl.__file__)).strip()
......@@ -125,6 +131,49 @@ def test_info():
assert'12345') == exp
def test_add_to_parser():
test_flags = [
('-T', '30.0'),
('-q', 'short.q'),
('-a', 'architecture'),
('-p', '3'),
('-M', ''),
('-N', 'job_name'),
('-R', '20'),
('-l', 'logdir'),
('-j', '12345,67890'),
('-m', 'mail_options'),
('-v', ),
('-F', ),
('-s', 'pename,thread')
with fslsub_mockFSLDIR():
for flag in test_flags:
for include in (None, [flag[0]]):
parser = argparse.ArgumentParser("test parser")
fslsub.SubmitParams.add_to_parser(parser, include=include)
args = parser.parse_args(flag)
submitter = fslsub.SubmitParams.from_args(args)
assert submitter.as_flags() == flag
with fslsub_mockFSLDIR():
parser = argparse.ArgumentParser("test parser")
fslsub.SubmitParams.add_to_parser(parser, include=None)
all_flags = tuple(part for flag in test_flags for part in flag)
args = parser.parse_args(('input', ) + all_flags)
assert args.some_input == 'input'
submitter = fslsub.SubmitParams.from_args(args)
assert len(all_flags) == len(submitter.as_flags())
for flag in test_flags:
res_flags = submitter.as_flags()
assert flag[0] in res_flags
start_index = res_flags.index(flag[0])
for idx, part in enumerate(flag):
assert res_flags[idx + start_index] == part
def myfunc():
print('standard output')
print('standard error', file=sys.stderr)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment