diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6a050cb81ce537763517ed9062fa6b3f48763634..6bc3be33baeabbc5e1c94802c856fd016928a9b0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -157,12 +157,6 @@ variables: - bash ./.ci/test_template.sh -test:3.6: - stage: test - image: pauldmccarthy/fsleyes-py36-wxpy4-gtk3 - <<: *test_template - - test:3.7: stage: test image: pauldmccarthy/fsleyes-py37-wxpy4-gtk3 diff --git a/CHANGELOG.rst b/CHANGELOG.rst index adab3734e2ac1a1b2dff5177049ec9f38fa8816c..dae80e9d6fc3df091bef68599230e7cfa58d1b96 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,7 +2,7 @@ This document contains the ``fslpy`` release history in reverse chronological order. -2.9.0 (Under development) +3.0.0 (Under development) ------------------------- @@ -17,12 +17,15 @@ Added rudimentary support for double-barrelled filenames. * The :func:`.nonlinear.applyDeformation` function now accepts a ``premat`` affine, which is applied to the input image before the deformation field. +* New :class:`.SubmitParams` class, providing a higer level interface for + cluster submission. Changed ^^^^^^^ +* ``fslpy`` now requires a minimum Python version of 3.7. * The :func:`.gifti.relatedFiles` function now supports files with BIDS-style naming conventions. * The :func:`.run.run` and :func:`.run.runfsl` functions now pass through any diff --git a/fsl/utils/fslsub.py b/fsl/utils/fslsub.py index 4f8479c716de3454cc7d26f850e81d4872de76da..f6fa49557059a9c011087af68382080c8dc2a4e6 100644 --- a/fsl/utils/fslsub.py +++ b/fsl/utils/fslsub.py @@ -49,27 +49,172 @@ import sys import tempfile import logging import importlib +from dataclasses import dataclass, asdict +from typing import Optional, Collection, Union, Tuple +import argparse +import warnings log = logging.getLogger(__name__) -def submit(*command, - minutes=None, - queue=None, - architecture=None, - priority=None, - email=None, - wait_for=None, - job_name=None, - ram=None, - logdir=None, - mail_options=None, - output=None, - flags=False, - multi_threaded=None, - verbose=False, - env=None): +@dataclass +class SubmitParams(object): + """ + Represents the fsl_sub parameters + """ + minutes: Optional[float] = None + queue: Optional[str] = None + architecture: Optional[str] = None + priority: Optional[int] = None + email: Optional[str] = None + wait_for: Union[str, None, Collection[str]] = None + job_name: Optional[str] = None + ram: Optional[int] = None + logdir: Optional[str] = None + mail_options: Optional[str] = None + flags: bool = False + multi_threaded: Optional[Tuple[str, str]] = None + verbose: bool = False + env: dict = None + + cmd_line_flags = { + '-T': 'minutes', + '-q': 'queue', + '-a': 'architecture', + '-p': 'priority', + '-M': 'email', + '-N': 'job_name', + '-R': 'ram', + '-l': 'logdir', + '-m': 'mail_options', + } + + def __post_init__(self): + if self.env is None: + self.env = {} + + def as_flags(self, ): + """ + Creates flags for submission using fsl_sub + + All parameters changed from their default value (typically None) will be included in the flags. + + :return: tuple with the flags + """ + res = [] + for key, value in self.cmd_line_flags.items(): + if getattr(self, value) is not None: + res.extend((key, str(getattr(self, value)))) + if self.verbose: + res.append('-v') + if self.flags: + res.append('-F') + if self.multi_threaded: + res.extend(("-s", ','.join(self.multi_threaded))) + if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0: + res.extend(('-j', _flatten_job_ids(self.wait_for))) + return tuple(res) + + def __str__(self): + return 'SubmitParams({})'.format(" ".join(self.as_flags())) + + def __call__(self, *command, **kwargs): + """ + Submits the command to the cluster. + + :param command: string or tuple of strings with the command to submit + :param kwargs: Keyword arguments can override any parameters set in self + :return: job ID + """ + from fsl.utils.run import prepareArgs, runfsl + runner = self.update(**kwargs) + command = prepareArgs(command) + fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command)) + log.debug(fsl_sub_cmd) + jobid = runfsl(fsl_sub_cmd, env=runner.env).strip() + log.debug('Job submitted as {}'.format(jobid)) + return jobid + + def update(self, **kwargs): + """ + Creates a new SubmitParams withe updated parameters + """ + values = asdict(self) + values.update(kwargs) + return SubmitParams(**values) + + @classmethod + def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands', + include=('wait_for', 'logdir', 'email', 'mail_options')): + """ + Adds submission parameters to the parser + + :param parser: parser that should understand submission commands + :param as_group: add as a new group + :param include: sequence of argument flags/names that should be added to the parser + (set to None to include everything) + :return: the group the arguments got added to + """ + from fsl.utils.run import runfsl, FSLNotPresent + try: + fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True) + except (FileNotFoundError, FSLNotPresent): + warnings.warn('fsl_sub was not found') + return + doc_lines = fsl_sub_run.splitlines() + nspaces = 1 + for line in doc_lines: + if len(line.strip()) > 0: + while line.startswith(' ' * nspaces): + nspaces += 1 + nspaces -= 1 + if as_group: + group = parser.add_argument_group(as_group) + else: + group = parser + + def get_explanation(flag): + explanation = None + for line in doc_lines: + if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-': + explanation.append(line[nspaces:].strip()) + elif explanation is not None: + break + elif line.strip().startswith(flag): + explanation = [line[nspaces:].strip()] + if (explanation is None) or (len(explanation) == 0): + return 'documentation not found' + return ' '.join(explanation) + + for flag, value in cls.cmd_line_flags.items(): + if include is not None and value not in include and flag not in include: + continue + + as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None} + action = 'store_true' if value == 'verbose' else 'store' + group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action, + metavar='<' + value + '>', type=as_type.get(value, str)) + group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true') + group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true') + group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'), + metavar='<pename>,<threads>') + group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'), + metavar='<jid>') + return group + + @classmethod + def from_args(cls, args): + as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()} + if args._sub_wait_for is not None: + as_dict['wait_for'] = args._sub_wait_for.split(',') + if args._sub_multi_threaded is not None: + pename, threads = args._sub_multi_threaded.split(',') + as_dict['multi_threaded'] = pename, threads + return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict) + + +def submit(*command, **kwargs): """ Submits a given command to the cluster @@ -104,41 +249,7 @@ def submit(*command, :return: string of submitted job id """ - - from fsl.utils.run import runfsl, prepareArgs - - base_cmd = ['fsl_sub'] - - for flag, variable_name in [ - ('-T', 'minutes'), - ('-q', 'queue'), - ('-a', 'architecture'), - ('-p', 'priority'), - ('-M', 'email'), - ('-N', 'job_name'), - ('-R', 'ram'), - ('-l', 'logdir'), - ('-m', 'mail_options'), - ('-z', 'output')]: - variable = locals()[variable_name] - if variable: - base_cmd.extend([flag, str(variable)]) - - if flags: - base_cmd.append('-F') - if verbose: - base_cmd.append('-v') - - if wait_for: - base_cmd.extend(['-j', _flatten_job_ids(wait_for)]) - - if multi_threaded: - base_cmd.append('-s') - base_cmd.extend(multi_threaded) - - base_cmd.extend(prepareArgs(command)) - - return runfsl(*base_cmd, env=env).strip() + return SubmitParams(**kwargs)(*command) def info(job_id): @@ -264,7 +375,7 @@ else: res = func(*args, **kwargs) if res is not None: - with open(__file__ + '_out.pickle') as f: + with open(__file__ + '_out.pickle', 'w') as f: pickle.dump(f, res) """ diff --git a/setup.py b/setup.py index e1b4c967a72360f80883760c60f7dd25e9836097..eee141bd0972ba372349a7d1037d076d7b9dd021 100644 --- a/setup.py +++ b/setup.py @@ -102,8 +102,6 @@ setup( 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers', 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Topic :: Software Development :: Libraries :: Python Modules'], diff --git a/tests/test_fslsub.py b/tests/test_fslsub.py index a0856f46df870463e90aacbf092a03f4e0795d8c..40d7668285c813b332c8da91574cc34ffa671681 100644 --- a/tests/test_fslsub.py +++ b/tests/test_fslsub.py @@ -12,7 +12,9 @@ import os.path as op import sys import textwrap as tw import contextlib +import argparse +import fsl from fsl.utils import fslsub from fsl.utils.tempdir import tempdir @@ -27,6 +29,13 @@ import os import os.path as op import sys import subprocess as sp + +fslpydir = op.join('{}', '..') +env = dict(os.environ) + +env['PYTHONPATH'] = fslpydir +sys.path.insert(0, fslpydir) + import fsl args = sys.argv[1:] @@ -44,9 +53,6 @@ for i in range(len(args)): args = args[i:] -env = dict(os.environ) -env['PYTHONPATH'] = op.join(op.dirname(fsl.__file__), '..') - cmd = op.basename(args[0]) jobid = random.randint(1, 9999) @@ -56,7 +62,7 @@ with open('{{}}.o{{}}'.format(cmd, jobid), 'w') as stdout, \ print(str(jobid)) sys.exit(0) -""".format(sys.executable).strip() +""".format(sys.executable, op.dirname(fsl.__file__)).strip() @contextlib.contextmanager @@ -125,6 +131,49 @@ def test_info(): assert fslsub.info('12345') == exp +def test_add_to_parser(): + test_flags = [ + ('-T', '30.0'), + ('-q', 'short.q'), + ('-a', 'architecture'), + ('-p', '3'), + ('-M', 'test@something.com'), + ('-N', 'job_name'), + ('-R', '20'), + ('-l', 'logdir'), + ('-j', '12345,67890'), + ('-m', 'mail_options'), + ('-v', ), + ('-F', ), + ('-s', 'pename,thread') + ] + with fslsub_mockFSLDIR(): + for flag in test_flags: + for include in (None, [flag[0]]): + parser = argparse.ArgumentParser("test parser") + fslsub.SubmitParams.add_to_parser(parser, include=include) + args = parser.parse_args(flag) + submitter = fslsub.SubmitParams.from_args(args) + assert submitter.as_flags() == flag + + with fslsub_mockFSLDIR(): + parser = argparse.ArgumentParser("test parser") + parser.add_argument('some_input') + fslsub.SubmitParams.add_to_parser(parser, include=None) + all_flags = tuple(part for flag in test_flags for part in flag) + args = parser.parse_args(('input', ) + all_flags) + assert args.some_input == 'input' + submitter = fslsub.SubmitParams.from_args(args) + assert len(all_flags) == len(submitter.as_flags()) + + for flag in test_flags: + res_flags = submitter.as_flags() + assert flag[0] in res_flags + start_index = res_flags.index(flag[0]) + for idx, part in enumerate(flag): + assert res_flags[idx + start_index] == part + + def myfunc(): print('standard output') print('standard error', file=sys.stderr)