fslsub.py 16.8 KB
Newer Older
Paul McCarthy's avatar
Paul McCarthy committed
1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
#
# fslsub.py - Functions for using fsl_sub.
#
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE.

Example usage, building a short pipeline::

12
    from fsl.utils.fslsub import submit
Paul McCarthy's avatar
Paul McCarthy committed
13
14
15
16
17
18
19
20
21
22

    # submits bet to veryshort queue unless <mask_filename> already exists
    bet_job = submit('bet <input_filename> -m',
                     queue='veryshort.q',
                     output='<mask_filename>')

    # submits another job
    other_job = submit('some other pre-processing step', queue='short.q')

    # submits cuda job, that should only start after both preparatory jobs are
23
24
    # finished. This will work if bet_job and other_job are single job-ids
    # (i.e., strings) or a sequence of multiple job-ids
Paul McCarthy's avatar
Paul McCarthy committed
25
    cuda_job = submit('expensive job',
26
                      wait_for=(bet_job, other_job),
Paul McCarthy's avatar
Paul McCarthy committed
27
28
29
30
31
32
33
                      queue='cuda.q')

.. autosummary::
   :nosignatures:

   submit
   info
34
35
   output
   func_to_cmd
Paul McCarthy's avatar
Paul McCarthy committed
36
37
38
"""


Paul McCarthy's avatar
Paul McCarthy committed
39
from six import BytesIO
40
import os.path as op
41
import glob
Paul McCarthy's avatar
Paul McCarthy committed
42
43
44
45
import time
import pickle
import sys
import tempfile
46
import logging
47
import importlib
48
from dataclasses import dataclass, asdict
49
from typing import Optional, Collection, Union, Tuple, Dict
50
51
import argparse
import warnings
Paul McCarthy's avatar
Paul McCarthy committed
52
53
54
55
56


log = logging.getLogger(__name__)


57
58
59
60
@dataclass
class SubmitParams(object):
    """
    Represents the fsl_sub parameters
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

    Any command line script can be submitted by the parameters by calling the `SubmitParams` object:

    .. codeblock:: python

        submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019'])
        submit('echo finished')

    This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.
    It is the equivalent of

    .. codeblock:: bash

        fsl_sub -T 1 -l log -j 108023,108019 "echo finished"

    For python scripts that submit themselves to the cluster, it might be useful to give the user some control
    over at least some of the submission parameters. This can be done using:

    .. codeblock:: python

        import argparse
        parser = argparse.ArgumentParser("my script doing awesome stuff")
        parser.add_argument("input_file")
        parser.add_argument("output_file")
        SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir'))
        args = parser.parse_args()

        submitter = SubmitParams.from_args(args).update(minutes=10)
        from fsl import wrappers
        wrappers.bet(input_file, output_file, fslsub=submitter)

    This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
93
94
95
96
97
98
99
100
101
102
103
    """
    minutes: Optional[float] = None
    queue: Optional[str] = None
    architecture: Optional[str] = None
    priority: Optional[int] = None
    email: Optional[str] = None
    wait_for: Union[str, None, Collection[str]] = None
    job_name: Optional[str] = None
    ram: Optional[int] = None
    logdir: Optional[str] = None
    mail_options: Optional[str] = None
104
105
    flags: bool = False
    multi_threaded: Optional[Tuple[str, str]] = None
106
    verbose: bool = False
107
    env: dict = None
108
109
110
111
112
113
114
115
116
117
118
119
120
121

    cmd_line_flags = {
        '-T': 'minutes',
        '-q': 'queue',
        '-a': 'architecture',
        '-p': 'priority',
        '-M': 'email',
        '-N': 'job_name',
        '-R': 'ram',
        '-l': 'logdir',
        '-m': 'mail_options',
    }

    def __post_init__(self):
122
123
124
        """
        If not set explicitly by the user don't alter the environment in which the script will be submitted
        """
125
126
        if self.env is None:
            self.env = {}
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

    def as_flags(self, ):
        """
        Creates flags for submission using fsl_sub

        All parameters changed from their default value (typically None) will be included in the flags.

        :return: tuple with the flags
        """
        res = []
        for key, value in self.cmd_line_flags.items():
            if getattr(self, value) is not None:
                res.extend((key, str(getattr(self, value))))
        if self.verbose:
            res.append('-v')
142
143
144
145
        if self.flags:
            res.append('-F')
        if self.multi_threaded:
            res.extend(("-s", ','.join(self.multi_threaded)))
146
147
148
149
150
        if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
            res.extend(('-j', _flatten_job_ids(self.wait_for)))
        return tuple(res)

    def __str__(self):
151
        return 'SubmitParams({})'.format(" ".join(self.as_flags()))
152

153
    def __call__(self, *command, **kwargs):
154
155
156
        """
        Submits the command to the cluster.

157
        :param command: string or tuple of strings with the command to submit
158
159
160
        :param kwargs: Keyword arguments can override any parameters set in self
        :return: job ID
        """
161
        from fsl.utils.run import prepareArgs, runfsl
162
        runner = self.update(**kwargs)
163
164
165
166
167
        command = prepareArgs(command)
        fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command))
        log.debug(fsl_sub_cmd)
        jobid = runfsl(fsl_sub_cmd, env=runner.env).strip()
        log.debug('Job submitted as {}'.format(jobid))
168
169
170
171
172
173
174
175
176
177
178
        return jobid

    def update(self, **kwargs):
        """
        Creates a new SubmitParams withe updated parameters
        """
        values = asdict(self)
        values.update(kwargs)
        return SubmitParams(**values)

    @classmethod
179
180
    def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands',
                      include=('wait_for', 'logdir', 'email', 'mail_options')):
181
182
183
184
185
        """
        Adds submission parameters to the parser

        :param parser: parser that should understand submission commands
        :param as_group: add as a new group
186
187
        :param include: sequence of argument flags/names that should be added to the parser
            (set to None to include everything)
188
189
        :return: the group the arguments got added to
        """
Paul McCarthy's avatar
Paul McCarthy committed
190
        from fsl.utils.run import runfsl, FSLNotPresent
191
        try:
192
            fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True)
Paul McCarthy's avatar
Paul McCarthy committed
193
        except (FileNotFoundError, FSLNotPresent):
194
195
            warnings.warn('fsl_sub was not found')
            return
196
        doc_lines = fsl_sub_run.splitlines()
197
198
199
200
201
202
203
204
205
206
        nspaces = 1
        for line in doc_lines:
            if len(line.strip()) > 0:
                while line.startswith(' ' * nspaces):
                    nspaces += 1
        nspaces -= 1
        if as_group:
            group = parser.add_argument_group(as_group)
        else:
            group = parser
207
208

        def get_explanation(flag):
209
210
211
212
213
214
215
216
            explanation = None
            for line in doc_lines:
                if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
                    explanation.append(line[nspaces:].strip())
                elif explanation is not None:
                    break
                elif line.strip().startswith(flag):
                    explanation = [line[nspaces:].strip()]
Paul McCarthy's avatar
Paul McCarthy committed
217
            if (explanation is None) or (len(explanation) == 0):
218
219
220
221
222
223
224
                return 'documentation not found'
            return ' '.join(explanation)

        for flag, value in cls.cmd_line_flags.items():
            if include is not None and value not in include and flag not in include:
                continue

225
226
            as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
            action = 'store_true' if value == 'verbose' else 'store'
227
228
229
230
231
232
233
234
            group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action,
                               metavar='<' + value + '>', type=as_type.get(value, str))
        group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true')
        group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true')
        group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'),
                           metavar='<pename>,<threads>')
        group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'),
                           metavar='<jid>')
235
236
237
238
        return group

    @classmethod
    def from_args(cls, args):
239
240
241
        """
        Create a SubmitParams from the command line arguments
        """
242
243
244
245
246
247
248
249
250
251
        as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
        if args._sub_wait_for is not None:
            as_dict['wait_for'] = args._sub_wait_for.split(',')
        if args._sub_multi_threaded is not None:
            pename, threads = args._sub_multi_threaded.split(',')
            as_dict['multi_threaded'] = pename, threads
        return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict)


def submit(*command, **kwargs):
252
253
254
255
    """
    Submits a given command to the cluster

    You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
Paul McCarthy's avatar
Paul McCarthy committed
256

257
    :arg command:        string or regular/unpacked sequence of strings with the job command
Paul McCarthy's avatar
Paul McCarthy committed
258
259
260
261
262
263
264
265
    :arg minutes:        Estimated job length in minutes, used to auto-set
                         queue name
    :arg queue:          Explicitly sets the queue name
    :arg architecture:   e.g., darwin or lx24-amd64
    :arg priority:       Lower priority [0:-1024] default = 0
    :arg email:          Who to email after job completion
    :arg wait_for:       Place a hold on this task until the job-ids in this
                         string or tuple are complete
266
    :arg job_name:       Specify job name as it will appear on the queue
Paul McCarthy's avatar
Paul McCarthy committed
267
268
269
270
271
272
273
274
275
    :arg ram:            Max total RAM to use for job (integer in MB)
    :arg logdir:         where to output logfiles
    :arg mail_options:   Change the SGE mail options, see qsub for details
    :arg output:         If <output> image or file already exists, do nothing
                         and exit
    :arg flags:          If True, use flags embedded in scripts to set SGE
                         queuing options
    :arg multi_threaded: Submit a multi-threaded task - Set to a tuple
                         containing two elements:
Paul McCarthy's avatar
Paul McCarthy committed
276

Paul McCarthy's avatar
Paul McCarthy committed
277
                          - <pename>: a PE configures for the requested queues
Paul McCarthy's avatar
Paul McCarthy committed
278

Paul McCarthy's avatar
Paul McCarthy committed
279
                          - <threads>: number of threads to run
Paul McCarthy's avatar
Paul McCarthy committed
280

Paul McCarthy's avatar
Paul McCarthy committed
281
    :arg verbose:        If True, use verbose mode
282
    :arg env:            Dict containing environment variables
Paul McCarthy's avatar
Paul McCarthy committed
283

284
    :return:             string of submitted job id
Paul McCarthy's avatar
Paul McCarthy committed
285
    """
286
    return SubmitParams(**kwargs)(*command)
Paul McCarthy's avatar
Paul McCarthy committed
287
288


289
def info(job_ids) -> Dict[str, Optional[Dict[str, str]]]:
Paul McCarthy's avatar
Paul McCarthy committed
290
291
    """Gets information on a given job id

292
    Uses `qstat -j <job_ids>`
Paul McCarthy's avatar
Paul McCarthy committed
293

294
295
296
    :arg job_ids: string with job id or (nested) sequence with jobs
    :return: dictionary of jobid -> another dictionary with job information
             (or None if job does not exist)
Paul McCarthy's avatar
Paul McCarthy committed
297
    """
298
299
300
301
302
303
    if not hasattr(info, '_ncall'):
        info._ncall = 0
    info._ncall += 1
    if info._ncall == 3:
        warnings.warn("Please do not call `fslsub.info` repeatably, because it slows down the cluster. You can avoid this message by simply passing all the job IDs you are interested in to a single `fslsub.info` call.")

Paul McCarthy's avatar
Paul McCarthy committed
304
    from fsl.utils.run import run
305
    job_ids_string = _flatten_job_ids(job_ids)
Paul McCarthy's avatar
Paul McCarthy committed
306
    try:
307
        result = run(['qstat', '-j', job_ids_string], exitcode=True)[0]
Paul McCarthy's avatar
Paul McCarthy committed
308
    except FileNotFoundError:
309
        log.debug("qstat not found; assuming not on cluster")
Paul McCarthy's avatar
Paul McCarthy committed
310
        return {}
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
    return _parse_qstat(job_ids_string, result)


def _parse_qstat(job_ids_string, qstat_stdout):
    """
    Parses the qstat output into a dictionary of dictionaries

    :param job_ids_string: input job ids
    :param qstat_stdout: qstat output
    :return: dictionary of jobid -> another dictionary with job information
             (or None if job does not exist)
    """
    res = {job_id: None for job_id in job_ids_string.split(',')}
    current_job_id = None
    for line in qstat_stdout.splitlines()[1:]:
        line = line.strip()
        if len(line) == 0:
            continue
        if line == '=' * len(line):
            current_job_id = None
        elif ':' in line:
            current_key, value = [part.strip() for part in line.split(':', 1)]
            if current_key == 'job_number':
                current_job_id = value
                if current_job_id not in job_ids_string:
                    raise ValueError(f"Unexpected job ID in qstat output:\n{line}")
                res[current_job_id] = {}
            else:
                if current_job_id is None:
                    raise ValueError(f"Found job information before job ID in qstat output:\n{line}")
                res[current_job_id][current_key] = value
        else:
            res[current_job_id][current_key] += '\n' + line
Paul McCarthy's avatar
Paul McCarthy committed
344
345
346
    return res


347
def output(job_id, logdir='.', command=None, name=None):
348
349
    """Returns the output of the given job.

350
    :arg job_id:  String containing job ID.
351
    :arg logdir:  Directory containing the log - defaults to
352
                  the current directory.
353
354
    :arg command: Command that was run. Not currently used.
    :arg name:    Job name if it was specified. Not currently used.
355
356
357
    :returns:     A tuple containing the standard output and standard error.
    """

358
359
    stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
    stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
360

361
362
363
364
365
366
    if len(stdout) != 1 or len(stderr) != 1:
        raise ValueError('No/too many error/output files for job {}: stdout: '
                         '{}, stderr: {}'.format(job_id, stdout, stderr))

    stdout = stdout[0]
    stderr = stderr[0]
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382

    if op.exists(stdout):
        with open(stdout, 'rt') as f:
            stdout = f.read()
    else:
        stdout = None

    if op.exists(stderr):
        with open(stderr, 'rt') as f:
            stderr = f.read()
    else:
        stderr = None

    return stdout, stderr


383
384
385
386
387
388
389
390
391
392
393
def _flatten_job_ids(job_ids):
    """
    Returns a potentially nested sequence of job ids as a single comma-separated string

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :return: comma-separated string of job ids
    """
    def unpack(job_ids):
        """Unpack the (nested) job-ids in a single set"""
        if isinstance(job_ids, str):
            return {job_ids}
394
395
        elif isinstance(job_ids, int):
            return {str(job_ids)}
396
397
398
399
400
401
402
403
404
        else:
            res = set()
            for job_id in job_ids:
                res.update(unpack(job_id))
            return res

    return ','.join(sorted(unpack(job_ids)))


Paul McCarthy's avatar
Paul McCarthy committed
405
406
407
408
409
410
411
412
413
_external_job = """#!{}
# This is a temporary file designed to run the python function {},
# so that it can be submitted to the cluster

import pickle
from six import BytesIO
from importlib import import_module

pickle_bytes = BytesIO({})
414
415
416
417
418
419
420
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)

if name_type == 'module':
    # retrieves a function defined in an external module
    func = getattr(import_module(name), func_name)
elif name_type == 'script':
    # retrieves a function defined in the __main__ script
421
    local_execute = {{'__name__': '__not_main__', '__file__': name}}
422
423
424
425
    exec(open(name, 'r').read(), local_execute)
    func = local_execute[func_name]
else:
    raise ValueError('Unknown name_type: %r' % name_type)
Paul McCarthy's avatar
Paul McCarthy committed
426
427
428

res = func(*args, **kwargs)
if res is not None:
429
    with open(__file__ + '_out.pickle', 'w') as f:
Paul McCarthy's avatar
Paul McCarthy committed
430
431
432
433
434
435
436
        pickle.dump(f, res)
"""


def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
    """Defines the command needed to run the function from the command line

437
438
439
440
    WARNING: if submitting a function defined in the __main__ script,
    the script will be run again to retrieve this function. Make sure there is a
    "if __name__ == '__main__'" guard to prevent the full script from being rerun.

Paul McCarthy's avatar
Paul McCarthy committed
441
442
443
444
    :arg func:    function to be run
    :arg args:    positional arguments
    :arg kwargs:  keyword arguments
    :arg tmp_dir: directory where to store the temporary file
445
    :arg clean:   if True removes the submitted script after running it
Paul McCarthy's avatar
Paul McCarthy committed
446
447
448
    :return:      string which will run the function
    """
    pickle_bytes = BytesIO()
449
450
451
452
453
454
    if func.__module__ == '__main__':
        pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
                     args, kwargs), pickle_bytes)
    else:
        pickle.dump(('module', func.__module__, func.__name__,
                     args, kwargs), pickle_bytes)
Paul McCarthy's avatar
Paul McCarthy committed
455
456
457
458
459
460
461
462
463
464
465
    python_cmd = _external_job.format(sys.executable,
                                      func.__name__,
                                      pickle_bytes.getvalue())

    _, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
                                   suffix='.py',
                                   dir=tmp_dir)

    with open(filename, 'w') as python_file:
        python_file.write(python_cmd)

466
    return sys.executable + " " + filename + ('; rm ' + filename if clean else '')