-
Paul McCarthy authoredPaul McCarthy authored
fslsub.py 9.55 KiB
#!/usr/bin/env python
#
# fslsub.py - Functions for using fsl_sub.
#
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE.
Example usage, building a short pipeline::
from fsl.utils.fslsub import submit, wait
# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
queue='veryshort.q',
output='<mask_filename>')
# submits another job
other_job = submit('some other pre-processing step', queue='short.q')
# submits cuda job, that should only start after both preparatory jobs are
# finished. This will work if bet_job and other_job are single job-ids
# (i.e., strings) or a sequence of multiple job-ids
cuda_job = submit('expensive job',
wait_for=(bet_job, other_job),
queue='cuda.q')
# waits for the cuda job to finish
wait(cuda_job)
.. autosummary::
:nosignatures:
submit
info
output
wait
func_to_cmd
"""
from six import BytesIO
import os.path as op
import glob
import time
import pickle
import sys
import tempfile
import logging
import importlib
log = logging.getLogger(__name__)
def submit(*command,
minutes=None,
queue=None,
architecture=None,
priority=None,
email=None,
wait_for=None,
job_name=None,
ram=None,
logdir=None,
mail_options=None,
output=None,
flags=False,
multi_threaded=None,
verbose=False,
env=None):
"""
Submits a given command to the cluster
You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
:arg command: string or regular/unpacked sequence of strings with the job command
:arg minutes: Estimated job length in minutes, used to auto-set
queue name
:arg queue: Explicitly sets the queue name
:arg architecture: e.g., darwin or lx24-amd64
:arg priority: Lower priority [0:-1024] default = 0
:arg email: Who to email after job completion
:arg wait_for: Place a hold on this task until the job-ids in this
string or tuple are complete
:arg job_name: Specify job name as it will appear on the queue
:arg ram: Max total RAM to use for job (integer in MB)
:arg logdir: where to output logfiles
:arg mail_options: Change the SGE mail options, see qsub for details
:arg output: If <output> image or file already exists, do nothing
and exit
:arg flags: If True, use flags embedded in scripts to set SGE
queuing options
:arg multi_threaded: Submit a multi-threaded task - Set to a tuple
containing two elements:
- <pename>: a PE configures for the requested queues
- <threads>: number of threads to run
:arg verbose: If True, use verbose mode
:arg env: Dict containing environment variables
:return: string of submitted job id
"""
from fsl.utils.run import runfsl, prepareArgs
base_cmd = ['fsl_sub']
for flag, variable_name in [
('-T', 'minutes'),
('-q', 'queue'),
('-a', 'architecture'),
('-p', 'priority'),
('-M', 'email'),
('-N', 'job_name'),
('-R', 'ram'),
('-l', 'logdir'),
('-m', 'mail_options'),
('-z', 'output')]:
variable = locals()[variable_name]
if variable:
base_cmd.extend([flag, str(variable)])
if flags:
base_cmd.append('-F')
if verbose:
base_cmd.append('-v')
if wait_for:
base_cmd.extend(['-j', _flatten_job_ids(wait_for)])
if multi_threaded:
base_cmd.append('-s')
base_cmd.extend(multi_threaded)
base_cmd.extend(prepareArgs(command))
return runfsl(*base_cmd, env=env).strip()
def info(job_id):
"""Gets information on a given job id
Uses `qstat -j <job_id>`
:arg job_id: string with job id
:return: dictionary with information on the submitted job (empty
if job does not exist)
"""
from fsl.utils.run import run
try:
result = run(['qstat', '-j', job_id], exitcode=True)[0]
except FileNotFoundError:
log.debug("qstat not found; assuming not on cluster")
return {}
if 'Following jobs do not exist:' in result:
return {}
res = {}
for line in result.splitlines()[1:]:
key, value = line.split(':', 1)
res[key.strip()] = value.strip()
return res
def output(job_id, logdir='.', command=None, name=None):
"""Returns the output of the given job.
:arg job_id: String containing job ID.
:arg logdir: Directory containing the log - defaults to
the current directory.
:arg command: Command that was run. Not currently used.
:arg name: Job name if it was specified. Not currently used.
:returns: A tuple containing the standard output and standard error.
"""
stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
if len(stdout) != 1 or len(stderr) != 1:
raise ValueError('No/too many error/output files for job {}: stdout: '
'{}, stderr: {}'.format(job_id, stdout, stderr))
stdout = stdout[0]
stderr = stderr[0]
if op.exists(stdout):
with open(stdout, 'rt') as f:
stdout = f.read()
else:
stdout = None
if op.exists(stderr):
with open(stderr, 'rt') as f:
stderr = f.read()
else:
stderr = None
return stdout, stderr
def wait(job_ids):
"""Wait for one or more jobs to finish
:arg job_ids: string or tuple of strings with jobs that should finish
before continuing
"""
start_time = time.time()
for job_id in _flatten_job_ids(job_ids):
log.debug('Waiting for job {}'.format(job_id))
while len(info(job_id)) > 0:
wait_time = min(max(1, (time.time() - start_time) / 3.), 20)
time.sleep(wait_time)
log.debug('Job {} finished, continuing to next'.format(job_id))
log.debug('All jobs have finished')
def _flatten_job_ids(job_ids):
"""
Returns a potentially nested sequence of job ids as a single comma-separated string
:param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
:return: comma-separated string of job ids
"""
def unpack(job_ids):
"""Unpack the (nested) job-ids in a single set"""
if isinstance(job_ids, str):
return {job_ids}
elif isinstance(job_ids, int):
return {str(job_ids)}
else:
res = set()
for job_id in job_ids:
res.update(unpack(job_id))
return res
return ','.join(sorted(unpack(job_ids)))
_external_job = """#!{}
# This is a temporary file designed to run the python function {},
# so that it can be submitted to the cluster
import pickle
from six import BytesIO
from importlib import import_module
pickle_bytes = BytesIO({})
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)
if name_type == 'module':
# retrieves a function defined in an external module
func = getattr(import_module(name), func_name)
elif name_type == 'script':
# retrieves a function defined in the __main__ script
local_execute = {{'__name__': '__not_main__', '__file__': name}}
exec(open(name, 'r').read(), local_execute)
func = local_execute[func_name]
else:
raise ValueError('Unknown name_type: %r' % name_type)
res = func(*args, **kwargs)
if res is not None:
with open(__file__ + '_out.pickle') as f:
pickle.dump(f, res)
"""
def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
"""Defines the command needed to run the function from the command line
WARNING: if submitting a function defined in the __main__ script,
the script will be run again to retrieve this function. Make sure there is a
"if __name__ == '__main__'" guard to prevent the full script from being rerun.
:arg func: function to be run
:arg args: positional arguments
:arg kwargs: keyword arguments
:arg tmp_dir: directory where to store the temporary file
:arg clean: if True removes the submitted script after running it
:return: string which will run the function
"""
pickle_bytes = BytesIO()
if func.__module__ == '__main__':
pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
args, kwargs), pickle_bytes)
else:
pickle.dump(('module', func.__module__, func.__name__,
args, kwargs), pickle_bytes)
python_cmd = _external_job.format(sys.executable,
func.__name__,
pickle_bytes.getvalue())
_, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
suffix='.py',
dir=tmp_dir)
with open(filename, 'w') as python_file:
python_file.write(python_cmd)
return sys.executable + " " + filename + ('; rm ' + filename if clean else '')