Commit 343919a3 authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

Merge branch 'remove_wait' into 'master'

Remove fslsub.wait

See merge request fsl/fslpy!234
parents 8d1ebf0a 1c006fda
......@@ -9,7 +9,7 @@ line tool. It is assumed that the computing cluster is managed by SGE.
Example usage, building a short pipeline::
from fsl.utils.fslsub import submit, wait
from fsl.utils.fslsub import submit
# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
......@@ -26,16 +26,12 @@ Example usage, building a short pipeline::
wait_for=(bet_job, other_job),
queue='cuda.q')
# waits for the cuda job to finish
wait(cuda_job)
.. autosummary::
:nosignatures:
submit
info
output
wait
func_to_cmd
"""
......@@ -50,7 +46,7 @@ import tempfile
import logging
import importlib
from dataclasses import dataclass, asdict
from typing import Optional, Collection, Union, Tuple
from typing import Optional, Collection, Union, Tuple, Dict
import argparse
import warnings
......@@ -62,6 +58,38 @@ log = logging.getLogger(__name__)
class SubmitParams(object):
"""
Represents the fsl_sub parameters
Any command line script can be submitted by the parameters by calling the `SubmitParams` object:
.. codeblock:: python
submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019'])
submit('echo finished')
This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.
It is the equivalent of
.. codeblock:: bash
fsl_sub -T 1 -l log -j 108023,108019 "echo finished"
For python scripts that submit themselves to the cluster, it might be useful to give the user some control
over at least some of the submission parameters. This can be done using:
.. codeblock:: python
import argparse
parser = argparse.ArgumentParser("my script doing awesome stuff")
parser.add_argument("input_file")
parser.add_argument("output_file")
SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir'))
args = parser.parse_args()
submitter = SubmitParams.from_args(args).update(minutes=10)
from fsl import wrappers
wrappers.bet(input_file, output_file, fslsub=submitter)
This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
"""
minutes: Optional[float] = None
queue: Optional[str] = None
......@@ -91,6 +119,9 @@ class SubmitParams(object):
}
def __post_init__(self):
"""
If not set explicitly by the user don't alter the environment in which the script will be submitted
"""
if self.env is None:
self.env = {}
......@@ -205,6 +236,9 @@ class SubmitParams(object):
@classmethod
def from_args(cls, args):
"""
Create a SubmitParams from the command line arguments
"""
as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
if args._sub_wait_for is not None:
as_dict['wait_for'] = args._sub_wait_for.split(',')
......@@ -252,28 +286,61 @@ def submit(*command, **kwargs):
return SubmitParams(**kwargs)(*command)
def info(job_id):
def info(job_ids) -> Dict[str, Optional[Dict[str, str]]]:
"""Gets information on a given job id
Uses `qstat -j <job_id>`
Uses `qstat -j <job_ids>`
:arg job_id: string with job id
:return: dictionary with information on the submitted job (empty
if job does not exist)
:arg job_ids: string with job id or (nested) sequence with jobs
:return: dictionary of jobid -> another dictionary with job information
(or None if job does not exist)
"""
if not hasattr(info, '_ncall'):
info._ncall = 0
info._ncall += 1
if info._ncall == 3:
warnings.warn("Please do not call `fslsub.info` repeatably, because it slows down the cluster. You can avoid this message by simply passing all the job IDs you are interested in to a single `fslsub.info` call.")
from fsl.utils.run import run
job_ids_string = _flatten_job_ids(job_ids)
try:
result = run(['qstat', '-j', job_id], exitcode=True)[0]
result = run(['qstat', '-j', job_ids_string], exitcode=True)[0]
except FileNotFoundError:
log.debug("qstat not found; assuming not on cluster")
return {}
if 'Following jobs do not exist:' in result:
return {}
res = {}
for line in result.splitlines()[1:]:
kv = line.split(':', 1)
if len(kv) == 2:
res[kv[0].strip()] = kv[1].strip()
return _parse_qstat(job_ids_string, result)
def _parse_qstat(job_ids_string, qstat_stdout):
"""
Parses the qstat output into a dictionary of dictionaries
:param job_ids_string: input job ids
:param qstat_stdout: qstat output
:return: dictionary of jobid -> another dictionary with job information
(or None if job does not exist)
"""
res = {job_id: None for job_id in job_ids_string.split(',')}
current_job_id = None
for line in qstat_stdout.splitlines()[1:]:
line = line.strip()
if len(line) == 0:
continue
if line == '=' * len(line):
current_job_id = None
elif ':' in line:
current_key, value = [part.strip() for part in line.split(':', 1)]
if current_key == 'job_number':
current_job_id = value
if current_job_id not in job_ids_string:
raise ValueError(f"Unexpected job ID in qstat output:\n{line}")
res[current_job_id] = {}
else:
if current_job_id is None:
raise ValueError(f"Found job information before job ID in qstat output:\n{line}")
res[current_job_id][current_key] = value
else:
res[current_job_id][current_key] += '\n' + line
return res
......@@ -313,22 +380,6 @@ def output(job_id, logdir='.', command=None, name=None):
return stdout, stderr
def wait(job_ids):
"""Wait for one or more jobs to finish
:arg job_ids: string or tuple of strings with jobs that should finish
before continuing
"""
start_time = time.time()
for job_id in _flatten_job_ids(job_ids).split(','):
log.debug('Waiting for job {}'.format(job_id))
while len(info(job_id)) > 0:
wait_time = min(max(1, (time.time() - start_time) / 3.), 20)
time.sleep(wait_time)
log.debug('Job {} finished, continuing to next'.format(job_id))
log.debug('All jobs have finished')
def _flatten_job_ids(job_ids):
"""
Returns a potentially nested sequence of job ids as a single comma-separated string
......
......@@ -15,7 +15,6 @@
run
runfsl
wait
dryrun
"""
......@@ -423,8 +422,3 @@ def wslcmd(cmdpath, *args):
else:
# Command was not found in WSL with this path
return None
def wait(job_ids):
"""Proxy for :func:`.fslsub.wait`. """
return fslsub.wait(job_ids)
......@@ -13,6 +13,7 @@ import sys
import textwrap as tw
import contextlib
import argparse
import pytest
import fsl
from fsl.utils import fslsub
......@@ -100,7 +101,6 @@ def test_submit():
os.chmod(cmd, 0o755)
jid = fslsub.submit(cmd)
fslsub.wait(jid)
stdout, stderr = fslsub.output(jid)
assert stdout.strip() == 'standard output'
......@@ -183,9 +183,76 @@ def test_func_to_cmd():
cmd = fslsub.func_to_cmd(myfunc, (), {})
jid = fslsub.submit(cmd)
fslsub.wait(jid)
stdout, stderr = fslsub.output(jid)
assert stdout.strip() == 'standard output'
assert stderr.strip() == 'standard error'
example_qstat_reply = """==============================================================
job_number: 9985061
exec_file: job_scripts/9985061
owner: user
sge_o_home: /home/fs0/user
sge_o_log_name: user
sge_o_shell: /bin/bash
sge_o_workdir: /home/fs0/user
account: sge
cwd: /home/fs0/user
mail_options: a
notify: FALSE
job_name: echo
jobshare: 0
hard_queue_list: long.q
restart: y
job_args: test
script_file: echo
binding: set linear:slots
job_type: binary,noshell
scheduling info: queue instance "<some queue>" dropped because it is temporarily not available
queue instance "<some queue>" dropped because it is disabled
==============================================================
job_number: 9985062
exec_file: job_scripts/9985062
owner: user
sge_o_home: /home/fs0/user
sge_o_log_name: user
sge_o_shell: /bin/bash
sge_o_workdir: /home/fs0/user
account: sge
cwd: /home/fs0/user
mail_options: a
notify: FALSE
job_name: echo
jobshare: 0
hard_queue_list: long.q
restart: y
job_args: test
script_file: echo
binding: set linear:slots
job_type: binary,noshell
scheduling info: queue instance "<some queue>" dropped because it is temporarily not available
queue instance "<some queue>" dropped because it is disabled
"""
def test_info():
valid_job_ids = ['9985061', '9985062']
res = fslsub._parse_qstat(','.join(valid_job_ids), example_qstat_reply)
assert len(res) == 2
for job_id in valid_job_ids:
assert res[job_id] is not None
assert res[job_id]['account'] == 'sge'
assert res[job_id]['job_type'] == 'binary,noshell'
assert len(res[job_id]['scheduling info'].splitlines()) == 2
for line in res[job_id]['scheduling info'].splitlines():
assert line.startswith('queue instance ')
res2 = fslsub._parse_qstat(','.join(valid_job_ids + ['1']), example_qstat_reply)
assert len(res2) == 3
for job_id in valid_job_ids:
assert res[job_id] == res2[job_id]
assert res2['1'] is None
with pytest.raises(ValueError):
fslsub._parse_qstat(valid_job_ids[0], example_qstat_reply)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment