Commit c1a2c2e5 authored by Michiel Cottaar's avatar Michiel Cottaar
BUG: Make SubmitParams compatible with existing code

Also added test for add_to_parser
parent 651cd844
......@@ -41,9 +41,7 @@ Example usage, building a short pipeline::
from six import BytesIO
import subprocess as sp
import os.path as op
import os
import glob
import time
import pickle
......@@ -52,10 +50,9 @@ import tempfile
import logging
import importlib
from dataclasses import dataclass, asdict
from typing import Optional, Collection, Union
from typing import Optional, Collection, Union, Tuple
import argparse
import warnings
from subprocess import run, PIPE
log = logging.getLogger(__name__)
......@@ -76,9 +73,10 @@ class SubmitParams(object):
ram: Optional[int] = None
logdir: Optional[str] = None
mail_options: Optional[str] = None
flags: Optional[str] = None
flags: bool = False
multi_threaded: Optional[Tuple[str, str]] = None
verbose: bool = False
environment: dict = None
env: dict = None
cmd_line_flags = {
'-T': 'minutes',
......@@ -90,12 +88,11 @@ class SubmitParams(object):
'-R': 'ram',
'-l': 'logdir',
'-m': 'mail_options',
'-F': 'flags',
def __post_init__(self):
if self.environment is None:
self.environment = {}
if self.env is None:
self.env = {}
def as_flags(self, ):
......@@ -111,32 +108,32 @@ class SubmitParams(object):
res.extend((key, str(getattr(self, value))))
if self.verbose:
if self.flags:
if self.multi_threaded:
res.extend(("-s", ','.join(self.multi_threaded)))
if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
res.extend(('-j', _flatten_job_ids(self.wait_for)))
return tuple(res)
def __str__(self):
return f'SubmitParams({" ".join(self.as_flags())})'
return 'SubmitParams({})'.format(" ".join(self.as_flags()))
def __call__(self, *cmd, **kwargs):
def __call__(self, *command, **kwargs):
Submits the command to the cluster.
:param cmd: string or tuple of strings with the command to submit
:param command: string or tuple of strings with the command to submit
:param kwargs: Keyword arguments can override any parameters set in self
:return: job ID
from import prepareArgs
from import prepareArgs, runfsl
runner = self.update(**kwargs)
cmd = prepareArgs(cmd)
log.debug(' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd)))
env = dict(os.environ)
jobid = run(
('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd),
stdout=PIPE, check=True, env=env,
log.debug(f'Job submitted as {jobid}')
command = prepareArgs(command)
fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command))
jobid = runfsl(fsl_sub_cmd, env=runner.env).strip()
log.debug('Job submitted as {}'.format(jobid))
return jobid
def update(self, **kwargs):
......@@ -148,22 +145,24 @@ class SubmitParams(object):
return SubmitParams(**values)
def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands', skip=()):
def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands',
include=('wait_for', 'logdir', 'email', 'mail_options')):
Adds submission parameters to the parser
:param parser: parser that should understand submission commands
:param as_group: add as a new group
:param skip: sequence of argument flags/names that should not be added to the parser
:param include: sequence of argument flags/names that should be added to the parser
(set to None to include everything)
:return: the group the arguments got added to
from import runfsl, prepareArgs
from import runfsl
fsl_sub_run = runfsl(['fsl_sub'])
fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True)
except FileNotFoundError:
warnings.warn('fsl_sub was not found')
doc_lines = fsl_sub_run.stdout.decode().splitlines()
doc_lines = fsl_sub_run.splitlines()
nspaces = 1
for line in doc_lines:
if len(line.strip()) > 0:
......@@ -174,9 +173,8 @@ class SubmitParams(object):
group = parser.add_argument_group(as_group)
group = parser
for flag, value in cls.cmd_line_flags.items():
if value in skip or flag in skip:
def get_explanation(flag):
explanation = None
for line in doc_lines:
if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
......@@ -185,35 +183,38 @@ class SubmitParams(object):
elif line.strip().startswith(flag):
explanation = [line[nspaces:].strip()]
explanation = ' '.join(explanation)
if len(explanation) == 0:
return 'documentation not found'
return ' '.join(explanation)
for flag, value in cls.cmd_line_flags.items():
if include is not None and value not in include and flag not in include:
as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
action = 'store_true' if value == 'verbose' else 'store'
group.add_argument(flag, dest='sub_' + value, help=explanation, action=action,
type=as_type[value] if value in as_type else str)
group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action,
metavar='<' + value + '>', type=as_type.get(value, str))
group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true')
group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true')
group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'),
group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'),
return group
def from_args(cls, args):
as_dict = {value: getattr(args, 'sub_' + value, None) for value in cls.cmd_line_flags.values()}
return cls(**as_dict)
def submit(*command,
as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
if args._sub_wait_for is not None:
as_dict['wait_for'] = args._sub_wait_for.split(',')
if args._sub_multi_threaded is not None:
pename, threads = args._sub_multi_threaded.split(',')
as_dict['multi_threaded'] = pename, threads
return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict)
def submit(*command, **kwargs):
Submits a given command to the cluster
......@@ -248,41 +249,7 @@ def submit(*command,
:return: string of submitted job id
from import runfsl, prepareArgs
base_cmd = ['fsl_sub']
for flag, variable_name in [
('-T', 'minutes'),
('-q', 'queue'),
('-a', 'architecture'),
('-p', 'priority'),
('-M', 'email'),
('-N', 'job_name'),
('-R', 'ram'),
('-l', 'logdir'),
('-m', 'mail_options'),
('-z', 'output')]:
variable = locals()[variable_name]
if variable:
base_cmd.extend([flag, str(variable)])
if flags:
if verbose:
if wait_for:
base_cmd.extend(['-j', _flatten_job_ids(wait_for)])
if multi_threaded:
return runfsl(*base_cmd, env=env).strip()
return SubmitParams(**kwargs)(*command)
def info(job_id):
......@@ -12,6 +12,7 @@ import os.path as op
import sys
import textwrap as tw
import contextlib
import argparse
from fsl.utils import fslsub
from fsl.utils.tempdir import tempdir
......@@ -125,6 +126,46 @@ def test_info():
assert'12345') == exp
def test_add_to_parser():
test_flags = [
('-T', '30.0'),
('-q', 'short.q'),
('-a', 'architecture'),
('-p', '3'),
('-M', ''),
('-N', 'job_name'),
('-R', '20'),
('-l', 'logdir'),
('-j', '12345,67890'),
('-m', 'mail_options'),
('-v', ),
('-F', ),
('-s', 'pename,thread')
for flag in test_flags:
for include in (None, [flag[0]]):
parser = argparse.ArgumentParser("test parser")
fslsub.SubmitParams.add_to_parser(parser, include=include)
args = parser.parse_args(flag)
submitter = fslsub.SubmitParams.from_args(args)
assert submitter.as_flags() == flag
parser = argparse.ArgumentParser("test parser")
fslsub.SubmitParams.add_to_parser(parser, include=None)
all_flags = tuple(part for flag in test_flags for part in flag)
args = parser.parse_args(('input', ) + all_flags)
assert args.some_input == 'input'
submitter = fslsub.SubmitParams.from_args(args)
assert len(all_flags) == len(submitter.as_flags())
for flag in test_flags:
res_flags = submitter.as_flags()
assert flag[0] in res_flags
start_index = res_flags.index(flag[0])
for idx, part in enumerate(flag):
assert res_flags[idx + start_index] == part
def myfunc():
print('standard output')
print('standard error', file=sys.stderr)
