diff --git a/fsl/utils/fslsub.py b/fsl/utils/fslsub.py index 054fe2496ad7e2c9646c504aa664fba0f00f6cdc..98727382338fa2c6271ff975970a3f952dfafc47 100644 --- a/fsl/utils/fslsub.py +++ b/fsl/utils/fslsub.py @@ -41,9 +41,7 @@ Example usage, building a short pipeline:: from six import BytesIO -import subprocess as sp import os.path as op -import os import glob import time import pickle @@ -52,10 +50,9 @@ import tempfile import logging import importlib from dataclasses import dataclass, asdict -from typing import Optional, Collection, Union +from typing import Optional, Collection, Union, Tuple import argparse import warnings -from subprocess import run, PIPE log = logging.getLogger(__name__) @@ -76,9 +73,10 @@ class SubmitParams(object): ram: Optional[int] = None logdir: Optional[str] = None mail_options: Optional[str] = None - flags: Optional[str] = None + flags: bool = False + multi_threaded: Optional[Tuple[str, str]] = None verbose: bool = False - environment: dict = None + env: dict = None cmd_line_flags = { '-T': 'minutes', @@ -90,12 +88,11 @@ class SubmitParams(object): '-R': 'ram', '-l': 'logdir', '-m': 'mail_options', - '-F': 'flags', } def __post_init__(self): - if self.environment is None: - self.environment = {} + if self.env is None: + self.env = {} def as_flags(self, ): """ @@ -111,32 +108,32 @@ class SubmitParams(object): res.extend((key, str(getattr(self, value)))) if self.verbose: res.append('-v') + if self.flags: + res.append('-F') + if self.multi_threaded: + res.extend(("-s", ','.join(self.multi_threaded))) if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0: res.extend(('-j', _flatten_job_ids(self.wait_for))) return tuple(res) def __str__(self): - return f'SubmitParams({" ".join(self.as_flags())})' + return 'SubmitParams({})'.format(" ".join(self.as_flags())) - def __call__(self, *cmd, **kwargs): + def __call__(self, *command, **kwargs): """ Submits the command to the cluster. - :param cmd: string or tuple of strings with the command to submit + :param command: string or tuple of strings with the command to submit :param kwargs: Keyword arguments can override any parameters set in self :return: job ID """ - from fsl.utils.run import prepareArgs + from fsl.utils.run import prepareArgs, runfsl runner = self.update(**kwargs) - cmd = prepareArgs(cmd) - log.debug(' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd))) - env = dict(os.environ) - env.update(runner.environment) - jobid = run( - ('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd), - stdout=PIPE, check=True, env=env, - ).stdout.decode().strip() - log.debug(f'Job submitted as {jobid}') + command = prepareArgs(command) + fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command)) + log.debug(fsl_sub_cmd) + jobid = runfsl(fsl_sub_cmd, env=runner.env).strip() + log.debug('Job submitted as {}'.format(jobid)) return jobid def update(self, **kwargs): @@ -148,22 +145,24 @@ class SubmitParams(object): return SubmitParams(**values) @classmethod - def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands', skip=()): + def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands', + include=('wait_for', 'logdir', 'email', 'mail_options')): """ Adds submission parameters to the parser :param parser: parser that should understand submission commands :param as_group: add as a new group - :param skip: sequence of argument flags/names that should not be added to the parser + :param include: sequence of argument flags/names that should be added to the parser + (set to None to include everything) :return: the group the arguments got added to """ - from fsl.utils.run import runfsl, prepareArgs + from fsl.utils.run import runfsl try: - fsl_sub_run = runfsl(['fsl_sub']) + fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True) except FileNotFoundError: warnings.warn('fsl_sub was not found') return - doc_lines = fsl_sub_run.stdout.decode().splitlines() + doc_lines = fsl_sub_run.splitlines() nspaces = 1 for line in doc_lines: if len(line.strip()) > 0: @@ -174,9 +173,8 @@ class SubmitParams(object): group = parser.add_argument_group(as_group) else: group = parser - for flag, value in cls.cmd_line_flags.items(): - if value in skip or flag in skip: - continue + + def get_explanation(flag): explanation = None for line in doc_lines: if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-': @@ -185,35 +183,38 @@ class SubmitParams(object): break elif line.strip().startswith(flag): explanation = [line[nspaces:].strip()] - explanation = ' '.join(explanation) + if len(explanation) == 0: + return 'documentation not found' + return ' '.join(explanation) + + for flag, value in cls.cmd_line_flags.items(): + if include is not None and value not in include and flag not in include: + continue + as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None} action = 'store_true' if value == 'verbose' else 'store' - group.add_argument(flag, dest='sub_' + value, help=explanation, action=action, - type=as_type[value] if value in as_type else str) + group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action, + metavar='<' + value + '>', type=as_type.get(value, str)) + group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true') + group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true') + group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'), + metavar='<pename>,<threads>') + group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'), + metavar='<jid>') return group @classmethod def from_args(cls, args): - as_dict = {value: getattr(args, 'sub_' + value, None) for value in cls.cmd_line_flags.values()} - return cls(**as_dict) - - -def submit(*command, - minutes=None, - queue=None, - architecture=None, - priority=None, - email=None, - wait_for=None, - job_name=None, - ram=None, - logdir=None, - mail_options=None, - output=None, - flags=False, - multi_threaded=None, - verbose=False, - env=None): + as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()} + if args._sub_wait_for is not None: + as_dict['wait_for'] = args._sub_wait_for.split(',') + if args._sub_multi_threaded is not None: + pename, threads = args._sub_multi_threaded.split(',') + as_dict['multi_threaded'] = pename, threads + return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict) + + +def submit(*command, **kwargs): """ Submits a given command to the cluster @@ -248,41 +249,7 @@ def submit(*command, :return: string of submitted job id """ - - from fsl.utils.run import runfsl, prepareArgs - - base_cmd = ['fsl_sub'] - - for flag, variable_name in [ - ('-T', 'minutes'), - ('-q', 'queue'), - ('-a', 'architecture'), - ('-p', 'priority'), - ('-M', 'email'), - ('-N', 'job_name'), - ('-R', 'ram'), - ('-l', 'logdir'), - ('-m', 'mail_options'), - ('-z', 'output')]: - variable = locals()[variable_name] - if variable: - base_cmd.extend([flag, str(variable)]) - - if flags: - base_cmd.append('-F') - if verbose: - base_cmd.append('-v') - - if wait_for: - base_cmd.extend(['-j', _flatten_job_ids(wait_for)]) - - if multi_threaded: - base_cmd.append('-s') - base_cmd.extend(multi_threaded) - - base_cmd.extend(prepareArgs(command)) - - return runfsl(*base_cmd, env=env).strip() + return SubmitParams(**kwargs)(*command) def info(job_id): diff --git a/tests/test_fslsub.py b/tests/test_fslsub.py index a0856f46df870463e90aacbf092a03f4e0795d8c..2065dcd10f6414e6bdcf1b9b2a1618eb41b3044f 100644 --- a/tests/test_fslsub.py +++ b/tests/test_fslsub.py @@ -12,6 +12,7 @@ import os.path as op import sys import textwrap as tw import contextlib +import argparse from fsl.utils import fslsub from fsl.utils.tempdir import tempdir @@ -125,6 +126,46 @@ def test_info(): assert fslsub.info('12345') == exp +def test_add_to_parser(): + test_flags = [ + ('-T', '30.0'), + ('-q', 'short.q'), + ('-a', 'architecture'), + ('-p', '3'), + ('-M', 'test@something.com'), + ('-N', 'job_name'), + ('-R', '20'), + ('-l', 'logdir'), + ('-j', '12345,67890'), + ('-m', 'mail_options'), + ('-v', ), + ('-F', ), + ('-s', 'pename,thread') + ] + for flag in test_flags: + for include in (None, [flag[0]]): + parser = argparse.ArgumentParser("test parser") + fslsub.SubmitParams.add_to_parser(parser, include=include) + args = parser.parse_args(flag) + submitter = fslsub.SubmitParams.from_args(args) + assert submitter.as_flags() == flag + + parser = argparse.ArgumentParser("test parser") + parser.add_argument('some_input') + fslsub.SubmitParams.add_to_parser(parser, include=None) + all_flags = tuple(part for flag in test_flags for part in flag) + args = parser.parse_args(('input', ) + all_flags) + assert args.some_input == 'input' + submitter = fslsub.SubmitParams.from_args(args) + assert len(all_flags) == len(submitter.as_flags()) + for flag in test_flags: + res_flags = submitter.as_flags() + assert flag[0] in res_flags + start_index = res_flags.index(flag[0]) + for idx, part in enumerate(flag): + assert res_flags[idx + start_index] == part + + def myfunc(): print('standard output') print('standard error', file=sys.stderr)