Skip to content
Snippets Groups Projects
Commit 651cd844 authored by Michiel Cottaar's avatar Michiel Cottaar
Browse files

Copied in SubmitParams from mcutils

parent dd48efe0
No related branches found
No related tags found
No related merge requests found
......@@ -41,7 +41,9 @@ Example usage, building a short pipeline::
from six import BytesIO
import subprocess as sp
import os.path as op
import os
import glob
import time
import pickle
......@@ -49,11 +51,153 @@ import sys
import tempfile
import logging
import importlib
from dataclasses import dataclass, asdict
from typing import Optional, Collection, Union
import argparse
import warnings
from subprocess import run, PIPE
log = logging.getLogger(__name__)
@dataclass
class SubmitParams(object):
"""
Represents the fsl_sub parameters
"""
minutes: Optional[float] = None
queue: Optional[str] = None
architecture: Optional[str] = None
priority: Optional[int] = None
email: Optional[str] = None
wait_for: Union[str, None, Collection[str]] = None
job_name: Optional[str] = None
ram: Optional[int] = None
logdir: Optional[str] = None
mail_options: Optional[str] = None
flags: Optional[str] = None
verbose: bool = False
environment: dict = None
cmd_line_flags = {
'-T': 'minutes',
'-q': 'queue',
'-a': 'architecture',
'-p': 'priority',
'-M': 'email',
'-N': 'job_name',
'-R': 'ram',
'-l': 'logdir',
'-m': 'mail_options',
'-F': 'flags',
}
def __post_init__(self):
if self.environment is None:
self.environment = {}
def as_flags(self, ):
"""
Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
:return: tuple with the flags
"""
res = []
for key, value in self.cmd_line_flags.items():
if getattr(self, value) is not None:
res.extend((key, str(getattr(self, value))))
if self.verbose:
res.append('-v')
if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
res.extend(('-j', _flatten_job_ids(self.wait_for)))
return tuple(res)
def __str__(self):
return f'SubmitParams({" ".join(self.as_flags())})'
def __call__(self, *cmd, **kwargs):
"""
Submits the command to the cluster.
:param cmd: string or tuple of strings with the command to submit
:param kwargs: Keyword arguments can override any parameters set in self
:return: job ID
"""
from fsl.utils.run import prepareArgs
runner = self.update(**kwargs)
cmd = prepareArgs(cmd)
log.debug(' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd)))
env = dict(os.environ)
env.update(runner.environment)
jobid = run(
('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd),
stdout=PIPE, check=True, env=env,
).stdout.decode().strip()
log.debug(f'Job submitted as {jobid}')
return jobid
def update(self, **kwargs):
"""
Creates a new SubmitParams withe updated parameters
"""
values = asdict(self)
values.update(kwargs)
return SubmitParams(**values)
@classmethod
def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands', skip=()):
"""
Adds submission parameters to the parser
:param parser: parser that should understand submission commands
:param as_group: add as a new group
:param skip: sequence of argument flags/names that should not be added to the parser
:return: the group the arguments got added to
"""
from fsl.utils.run import runfsl, prepareArgs
try:
fsl_sub_run = runfsl(['fsl_sub'])
except FileNotFoundError:
warnings.warn('fsl_sub was not found')
return
doc_lines = fsl_sub_run.stdout.decode().splitlines()
nspaces = 1
for line in doc_lines:
if len(line.strip()) > 0:
while line.startswith(' ' * nspaces):
nspaces += 1
nspaces -= 1
if as_group:
group = parser.add_argument_group(as_group)
else:
group = parser
for flag, value in cls.cmd_line_flags.items():
if value in skip or flag in skip:
continue
explanation = None
for line in doc_lines:
if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
explanation.append(line[nspaces:].strip())
elif explanation is not None:
break
elif line.strip().startswith(flag):
explanation = [line[nspaces:].strip()]
explanation = ' '.join(explanation)
as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
action = 'store_true' if value == 'verbose' else 'store'
group.add_argument(flag, dest='sub_' + value, help=explanation, action=action,
type=as_type[value] if value in as_type else str)
return group
@classmethod
def from_args(cls, args):
as_dict = {value: getattr(args, 'sub_' + value, None) for value in cls.cmd_line_flags.values()}
return cls(**as_dict)
def submit(*command,
minutes=None,
queue=None,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment