diff --git a/fsl/utils/fslsub.py b/fsl/utils/fslsub.py index 4f8479c716de3454cc7d26f850e81d4872de76da..054fe2496ad7e2c9646c504aa664fba0f00f6cdc 100644 --- a/fsl/utils/fslsub.py +++ b/fsl/utils/fslsub.py @@ -41,7 +41,9 @@ Example usage, building a short pipeline:: from six import BytesIO +import subprocess as sp import os.path as op +import os import glob import time import pickle @@ -49,11 +51,153 @@ import sys import tempfile import logging import importlib +from dataclasses import dataclass, asdict +from typing import Optional, Collection, Union +import argparse +import warnings +from subprocess import run, PIPE log = logging.getLogger(__name__) +@dataclass +class SubmitParams(object): + """ + Represents the fsl_sub parameters + """ + minutes: Optional[float] = None + queue: Optional[str] = None + architecture: Optional[str] = None + priority: Optional[int] = None + email: Optional[str] = None + wait_for: Union[str, None, Collection[str]] = None + job_name: Optional[str] = None + ram: Optional[int] = None + logdir: Optional[str] = None + mail_options: Optional[str] = None + flags: Optional[str] = None + verbose: bool = False + environment: dict = None + + cmd_line_flags = { + '-T': 'minutes', + '-q': 'queue', + '-a': 'architecture', + '-p': 'priority', + '-M': 'email', + '-N': 'job_name', + '-R': 'ram', + '-l': 'logdir', + '-m': 'mail_options', + '-F': 'flags', + } + + def __post_init__(self): + if self.environment is None: + self.environment = {} + + def as_flags(self, ): + """ + Creates flags for submission using fsl_sub + + All parameters changed from their default value (typically None) will be included in the flags. + + :return: tuple with the flags + """ + res = [] + for key, value in self.cmd_line_flags.items(): + if getattr(self, value) is not None: + res.extend((key, str(getattr(self, value)))) + if self.verbose: + res.append('-v') + if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0: + res.extend(('-j', _flatten_job_ids(self.wait_for))) + return tuple(res) + + def __str__(self): + return f'SubmitParams({" ".join(self.as_flags())})' + + def __call__(self, *cmd, **kwargs): + """ + Submits the command to the cluster. + + :param cmd: string or tuple of strings with the command to submit + :param kwargs: Keyword arguments can override any parameters set in self + :return: job ID + """ + from fsl.utils.run import prepareArgs + runner = self.update(**kwargs) + cmd = prepareArgs(cmd) + log.debug(' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd))) + env = dict(os.environ) + env.update(runner.environment) + jobid = run( + ('fsl_sub', ) + tuple(runner.as_flags()) + tuple(cmd), + stdout=PIPE, check=True, env=env, + ).stdout.decode().strip() + log.debug(f'Job submitted as {jobid}') + return jobid + + def update(self, **kwargs): + """ + Creates a new SubmitParams withe updated parameters + """ + values = asdict(self) + values.update(kwargs) + return SubmitParams(**values) + + @classmethod + def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands', skip=()): + """ + Adds submission parameters to the parser + + :param parser: parser that should understand submission commands + :param as_group: add as a new group + :param skip: sequence of argument flags/names that should not be added to the parser + :return: the group the arguments got added to + """ + from fsl.utils.run import runfsl, prepareArgs + try: + fsl_sub_run = runfsl(['fsl_sub']) + except FileNotFoundError: + warnings.warn('fsl_sub was not found') + return + doc_lines = fsl_sub_run.stdout.decode().splitlines() + nspaces = 1 + for line in doc_lines: + if len(line.strip()) > 0: + while line.startswith(' ' * nspaces): + nspaces += 1 + nspaces -= 1 + if as_group: + group = parser.add_argument_group(as_group) + else: + group = parser + for flag, value in cls.cmd_line_flags.items(): + if value in skip or flag in skip: + continue + explanation = None + for line in doc_lines: + if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-': + explanation.append(line[nspaces:].strip()) + elif explanation is not None: + break + elif line.strip().startswith(flag): + explanation = [line[nspaces:].strip()] + explanation = ' '.join(explanation) + as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None} + action = 'store_true' if value == 'verbose' else 'store' + group.add_argument(flag, dest='sub_' + value, help=explanation, action=action, + type=as_type[value] if value in as_type else str) + return group + + @classmethod + def from_args(cls, args): + as_dict = {value: getattr(args, 'sub_' + value, None) for value in cls.cmd_line_flags.values()} + return cls(**as_dict) + + def submit(*command, minutes=None, queue=None,