diff --git a/fsl/utils/fslsub.py b/fsl/utils/fslsub.py index f6fa49557059a9c011087af68382080c8dc2a4e6..c39723700d986a82d22927eb252038747347c160 100644 --- a/fsl/utils/fslsub.py +++ b/fsl/utils/fslsub.py @@ -9,7 +9,7 @@ line tool. It is assumed that the computing cluster is managed by SGE. Example usage, building a short pipeline:: - from fsl.utils.fslsub import submit, wait + from fsl.utils.fslsub import submit # submits bet to veryshort queue unless <mask_filename> already exists bet_job = submit('bet <input_filename> -m', @@ -26,16 +26,12 @@ Example usage, building a short pipeline:: wait_for=(bet_job, other_job), queue='cuda.q') - # waits for the cuda job to finish - wait(cuda_job) - .. autosummary:: :nosignatures: submit info output - wait func_to_cmd """ @@ -50,7 +46,7 @@ import tempfile import logging import importlib from dataclasses import dataclass, asdict -from typing import Optional, Collection, Union, Tuple +from typing import Optional, Collection, Union, Tuple, Dict import argparse import warnings @@ -62,6 +58,38 @@ log = logging.getLogger(__name__) class SubmitParams(object): """ Represents the fsl_sub parameters + + Any command line script can be submitted by the parameters by calling the `SubmitParams` object: + + .. codeblock:: python + + submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019']) + submit('echo finished') + + This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished. + It is the equivalent of + + .. codeblock:: bash + + fsl_sub -T 1 -l log -j 108023,108019 "echo finished" + + For python scripts that submit themselves to the cluster, it might be useful to give the user some control + over at least some of the submission parameters. This can be done using: + + .. codeblock:: python + + import argparse + parser = argparse.ArgumentParser("my script doing awesome stuff") + parser.add_argument("input_file") + parser.add_argument("output_file") + SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir')) + args = parser.parse_args() + + submitter = SubmitParams.from_args(args).update(minutes=10) + from fsl import wrappers + wrappers.bet(input_file, output_file, fslsub=submitter) + + This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes. """ minutes: Optional[float] = None queue: Optional[str] = None @@ -91,6 +119,9 @@ class SubmitParams(object): } def __post_init__(self): + """ + If not set explicitly by the user don't alter the environment in which the script will be submitted + """ if self.env is None: self.env = {} @@ -205,6 +236,9 @@ class SubmitParams(object): @classmethod def from_args(cls, args): + """ + Create a SubmitParams from the command line arguments + """ as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()} if args._sub_wait_for is not None: as_dict['wait_for'] = args._sub_wait_for.split(',') @@ -252,28 +286,61 @@ def submit(*command, **kwargs): return SubmitParams(**kwargs)(*command) -def info(job_id): +def info(job_ids) -> Dict[str, Optional[Dict[str, str]]]: """Gets information on a given job id - Uses `qstat -j <job_id>` + Uses `qstat -j <job_ids>` - :arg job_id: string with job id - :return: dictionary with information on the submitted job (empty - if job does not exist) + :arg job_ids: string with job id or (nested) sequence with jobs + :return: dictionary of jobid -> another dictionary with job information + (or None if job does not exist) """ + if not hasattr(info, '_ncall'): + info._ncall = 0 + info._ncall += 1 + if info._ncall == 3: + warnings.warn("Please do not call `fslsub.info` repeatably, because it slows down the cluster. You can avoid this message by simply passing all the job IDs you are interested in to a single `fslsub.info` call.") + from fsl.utils.run import run + job_ids_string = _flatten_job_ids(job_ids) try: - result = run(['qstat', '-j', job_id], exitcode=True)[0] + result = run(['qstat', '-j', job_ids_string], exitcode=True)[0] except FileNotFoundError: log.debug("qstat not found; assuming not on cluster") return {} - if 'Following jobs do not exist:' in result: - return {} - res = {} - for line in result.splitlines()[1:]: - kv = line.split(':', 1) - if len(kv) == 2: - res[kv[0].strip()] = kv[1].strip() + return _parse_qstat(job_ids_string, result) + + +def _parse_qstat(job_ids_string, qstat_stdout): + """ + Parses the qstat output into a dictionary of dictionaries + + :param job_ids_string: input job ids + :param qstat_stdout: qstat output + :return: dictionary of jobid -> another dictionary with job information + (or None if job does not exist) + """ + res = {job_id: None for job_id in job_ids_string.split(',')} + current_job_id = None + for line in qstat_stdout.splitlines()[1:]: + line = line.strip() + if len(line) == 0: + continue + if line == '=' * len(line): + current_job_id = None + elif ':' in line: + current_key, value = [part.strip() for part in line.split(':', 1)] + if current_key == 'job_number': + current_job_id = value + if current_job_id not in job_ids_string: + raise ValueError(f"Unexpected job ID in qstat output:\n{line}") + res[current_job_id] = {} + else: + if current_job_id is None: + raise ValueError(f"Found job information before job ID in qstat output:\n{line}") + res[current_job_id][current_key] = value + else: + res[current_job_id][current_key] += '\n' + line return res @@ -313,22 +380,6 @@ def output(job_id, logdir='.', command=None, name=None): return stdout, stderr -def wait(job_ids): - """Wait for one or more jobs to finish - - :arg job_ids: string or tuple of strings with jobs that should finish - before continuing - """ - start_time = time.time() - for job_id in _flatten_job_ids(job_ids).split(','): - log.debug('Waiting for job {}'.format(job_id)) - while len(info(job_id)) > 0: - wait_time = min(max(1, (time.time() - start_time) / 3.), 20) - time.sleep(wait_time) - log.debug('Job {} finished, continuing to next'.format(job_id)) - log.debug('All jobs have finished') - - def _flatten_job_ids(job_ids): """ Returns a potentially nested sequence of job ids as a single comma-separated string diff --git a/fsl/utils/run.py b/fsl/utils/run.py index e2adbeb8de7d39006b28eac3497573fd281cd8d9..739d89ed2e6fb0e5c89356ef45f2176fd48c3228 100644 --- a/fsl/utils/run.py +++ b/fsl/utils/run.py @@ -15,7 +15,6 @@ run runfsl - wait dryrun """ @@ -423,8 +422,3 @@ def wslcmd(cmdpath, *args): else: # Command was not found in WSL with this path return None - - -def wait(job_ids): - """Proxy for :func:`.fslsub.wait`. """ - return fslsub.wait(job_ids) diff --git a/tests/test_fslsub.py b/tests/test_fslsub.py index d7c6460cafe5428d36833a953befe27207a4c626..94cff86f140d9c60f4e9922565c1ac3524036ee0 100644 --- a/tests/test_fslsub.py +++ b/tests/test_fslsub.py @@ -13,6 +13,7 @@ import sys import textwrap as tw import contextlib import argparse +import pytest import fsl from fsl.utils import fslsub @@ -100,7 +101,6 @@ def test_submit(): os.chmod(cmd, 0o755) jid = fslsub.submit(cmd) - fslsub.wait(jid) stdout, stderr = fslsub.output(jid) assert stdout.strip() == 'standard output' @@ -183,9 +183,76 @@ def test_func_to_cmd(): cmd = fslsub.func_to_cmd(myfunc, (), {}) jid = fslsub.submit(cmd) - fslsub.wait(jid) - stdout, stderr = fslsub.output(jid) assert stdout.strip() == 'standard output' assert stderr.strip() == 'standard error' + + +example_qstat_reply = """============================================================== +job_number: 9985061 +exec_file: job_scripts/9985061 +owner: user +sge_o_home: /home/fs0/user +sge_o_log_name: user +sge_o_shell: /bin/bash +sge_o_workdir: /home/fs0/user +account: sge +cwd: /home/fs0/user +mail_options: a +notify: FALSE +job_name: echo +jobshare: 0 +hard_queue_list: long.q +restart: y +job_args: test +script_file: echo +binding: set linear:slots +job_type: binary,noshell +scheduling info: queue instance "<some queue>" dropped because it is temporarily not available + queue instance "<some queue>" dropped because it is disabled +============================================================== +job_number: 9985062 +exec_file: job_scripts/9985062 +owner: user +sge_o_home: /home/fs0/user +sge_o_log_name: user +sge_o_shell: /bin/bash +sge_o_workdir: /home/fs0/user +account: sge +cwd: /home/fs0/user +mail_options: a +notify: FALSE +job_name: echo +jobshare: 0 +hard_queue_list: long.q +restart: y +job_args: test +script_file: echo +binding: set linear:slots +job_type: binary,noshell +scheduling info: queue instance "<some queue>" dropped because it is temporarily not available + queue instance "<some queue>" dropped because it is disabled +""" + + +def test_info(): + valid_job_ids = ['9985061', '9985062'] + res = fslsub._parse_qstat(','.join(valid_job_ids), example_qstat_reply) + assert len(res) == 2 + for job_id in valid_job_ids: + assert res[job_id] is not None + assert res[job_id]['account'] == 'sge' + assert res[job_id]['job_type'] == 'binary,noshell' + assert len(res[job_id]['scheduling info'].splitlines()) == 2 + for line in res[job_id]['scheduling info'].splitlines(): + assert line.startswith('queue instance ') + + res2 = fslsub._parse_qstat(','.join(valid_job_ids + ['1']), example_qstat_reply) + assert len(res2) == 3 + for job_id in valid_job_ids: + assert res[job_id] == res2[job_id] + assert res2['1'] is None + + with pytest.raises(ValueError): + fslsub._parse_qstat(valid_job_ids[0], example_qstat_reply)