fslsub.py 19.7 KB
Newer Older
Paul McCarthy's avatar
Paul McCarthy committed
1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
#
# fslsub.py - Functions for using fsl_sub.
#
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE.

Example usage, building a short pipeline::

12
    from fsl.utils.fslsub import submit
Paul McCarthy's avatar
Paul McCarthy committed
13
14
15
16
17
18
19
20
21
22

    # submits bet to veryshort queue unless <mask_filename> already exists
    bet_job = submit('bet <input_filename> -m',
                     queue='veryshort.q',
                     output='<mask_filename>')

    # submits another job
    other_job = submit('some other pre-processing step', queue='short.q')

    # submits cuda job, that should only start after both preparatory jobs are
23
24
    # finished. This will work if bet_job and other_job are single job-ids
    # (i.e., strings) or a sequence of multiple job-ids
Paul McCarthy's avatar
Paul McCarthy committed
25
    cuda_job = submit('expensive job',
26
                      wait_for=(bet_job, other_job),
Paul McCarthy's avatar
Paul McCarthy committed
27
28
29
30
31
32
33
                      queue='cuda.q')

.. autosummary::
   :nosignatures:

   submit
   info
34
35
   output
   func_to_cmd
36
   hold
Paul McCarthy's avatar
Paul McCarthy committed
37
38
39
"""


Paul McCarthy's avatar
Paul McCarthy committed
40
from io import BytesIO
41
import os.path as op
42
import glob
Paul McCarthy's avatar
Paul McCarthy committed
43
44
45
46
import time
import pickle
import sys
import tempfile
47
import logging
48
import importlib
49
from dataclasses import dataclass, asdict
50
from typing import Optional, Collection, Union, Tuple, Dict
51
52
import argparse
import warnings
Michiel Cottaar's avatar
Michiel Cottaar committed
53
import os
Paul McCarthy's avatar
Paul McCarthy committed
54
55


56
57
58
import fsl.utils.deprecated as deprecated


Paul McCarthy's avatar
Paul McCarthy committed
59
60
61
log = logging.getLogger(__name__)


62
63
@dataclass
class SubmitParams(object):
64
65
66
67
68
    """Represents the fsl_sub parameters

    The ``SubmitParams`` class is deprecated - you should use
    :mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python
    library, which is installed as part of FSL.
69
70
71

    Any command line script can be submitted by the parameters by calling the `SubmitParams` object:

Paul McCarthy's avatar
Paul McCarthy committed
72
    .. code-block:: python
73
74
75
76
77
78
79

        submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019'])
        submit('echo finished')

    This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.
    It is the equivalent of

Paul McCarthy's avatar
Paul McCarthy committed
80
    .. code-block:: bash
81
82
83
84
85
86

        fsl_sub -T 1 -l log -j 108023,108019 "echo finished"

    For python scripts that submit themselves to the cluster, it might be useful to give the user some control
    over at least some of the submission parameters. This can be done using:

Paul McCarthy's avatar
Paul McCarthy committed
87
    .. code-block:: python
88
89
90
91
92
93
94
95
96
97
98
99
100

        import argparse
        parser = argparse.ArgumentParser("my script doing awesome stuff")
        parser.add_argument("input_file")
        parser.add_argument("output_file")
        SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir'))
        args = parser.parse_args()

        submitter = SubmitParams.from_args(args).update(minutes=10)
        from fsl import wrappers
        wrappers.bet(input_file, output_file, fslsub=submitter)

    This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
101
102
103
104
105
106
107
108
109
110
111
    """
    minutes: Optional[float] = None
    queue: Optional[str] = None
    architecture: Optional[str] = None
    priority: Optional[int] = None
    email: Optional[str] = None
    wait_for: Union[str, None, Collection[str]] = None
    job_name: Optional[str] = None
    ram: Optional[int] = None
    logdir: Optional[str] = None
    mail_options: Optional[str] = None
112
113
    flags: bool = False
    multi_threaded: Optional[Tuple[str, str]] = None
114
    verbose: bool = False
115
    env: dict = None
116
117
118
119
120
121
122
123
124
125
126
127
128
129

    cmd_line_flags = {
        '-T': 'minutes',
        '-q': 'queue',
        '-a': 'architecture',
        '-p': 'priority',
        '-M': 'email',
        '-N': 'job_name',
        '-R': 'ram',
        '-l': 'logdir',
        '-m': 'mail_options',
    }

    def __post_init__(self):
130
131
132
        """
        If not set explicitly by the user don't alter the environment in which the script will be submitted
        """
133
        if self.env is None:
134
            self.env = dict(os.environ)
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

    def as_flags(self, ):
        """
        Creates flags for submission using fsl_sub

        All parameters changed from their default value (typically None) will be included in the flags.

        :return: tuple with the flags
        """
        res = []
        for key, value in self.cmd_line_flags.items():
            if getattr(self, value) is not None:
                res.extend((key, str(getattr(self, value))))
        if self.verbose:
            res.append('-v')
150
151
152
153
        if self.flags:
            res.append('-F')
        if self.multi_threaded:
            res.extend(("-s", ','.join(self.multi_threaded)))
154
155
156
157
158
        if self.wait_for is not None and len(_flatten_job_ids(self.wait_for)) > 0:
            res.extend(('-j', _flatten_job_ids(self.wait_for)))
        return tuple(res)

    def __str__(self):
159
        return 'SubmitParams({})'.format(" ".join(self.as_flags()))
160

161
162
    @deprecated.deprecated('3.7.0', '4.0.0',
                           'Use fsl.wrappers.fsl_sub instead')
163
    def __call__(self, *command, **kwargs):
164
165
166
        """
        Submits the command to the cluster.

167
        :param command: string or tuple of strings with the command to submit
168
169
170
        :param kwargs: Keyword arguments can override any parameters set in self
        :return: job ID
        """
171
        from fsl.utils.run import prepareArgs, runfsl
172
        runner = self.update(**kwargs)
173
174
175
176
177
        command = prepareArgs(command)
        fsl_sub_cmd = ' '.join(('fsl_sub', ) + tuple(runner.as_flags()) + tuple(command))
        log.debug(fsl_sub_cmd)
        jobid = runfsl(fsl_sub_cmd, env=runner.env).strip()
        log.debug('Job submitted as {}'.format(jobid))
178
179
180
181
182
183
184
185
186
187
188
        return jobid

    def update(self, **kwargs):
        """
        Creates a new SubmitParams withe updated parameters
        """
        values = asdict(self)
        values.update(kwargs)
        return SubmitParams(**values)

    @classmethod
189
190
    def add_to_parser(cls, parser: argparse.ArgumentParser, as_group='fsl_sub commands',
                      include=('wait_for', 'logdir', 'email', 'mail_options')):
191
192
193
194
195
        """
        Adds submission parameters to the parser

        :param parser: parser that should understand submission commands
        :param as_group: add as a new group
196
197
        :param include: sequence of argument flags/names that should be added to the parser
            (set to None to include everything)
198
199
        :return: the group the arguments got added to
        """
Paul McCarthy's avatar
Paul McCarthy committed
200
        from fsl.utils.run import runfsl, FSLNotPresent
201
        try:
202
            fsl_sub_run, _ = runfsl('fsl_sub', exitcode=True)
Paul McCarthy's avatar
Paul McCarthy committed
203
        except (FileNotFoundError, FSLNotPresent):
204
205
            warnings.warn('fsl_sub was not found')
            return
206
        doc_lines = fsl_sub_run.splitlines()
207
208
209
210
211
212
213
214
215
216
        nspaces = 1
        for line in doc_lines:
            if len(line.strip()) > 0:
                while line.startswith(' ' * nspaces):
                    nspaces += 1
        nspaces -= 1
        if as_group:
            group = parser.add_argument_group(as_group)
        else:
            group = parser
217
218

        def get_explanation(flag):
219
220
221
222
223
224
225
226
            explanation = None
            for line in doc_lines:
                if explanation is not None and len(line.strip()) > 0 and line.strip()[0] != '-':
                    explanation.append(line[nspaces:].strip())
                elif explanation is not None:
                    break
                elif line.strip().startswith(flag):
                    explanation = [line[nspaces:].strip()]
Paul McCarthy's avatar
Paul McCarthy committed
227
            if (explanation is None) or (len(explanation) == 0):
228
229
230
231
232
233
234
                return 'documentation not found'
            return ' '.join(explanation)

        for flag, value in cls.cmd_line_flags.items():
            if include is not None and value not in include and flag not in include:
                continue

235
236
            as_type = {'minutes': float, 'priority': int, 'ram': int, 'verbose': None}
            action = 'store_true' if value == 'verbose' else 'store'
237
238
239
240
241
242
243
244
            group.add_argument(flag, dest='_sub_' + value, help=get_explanation(flag), action=action,
                               metavar='<' + value + '>', type=as_type.get(value, str))
        group.add_argument('-F', dest='_sub_flags', help=get_explanation('-F'), action='store_true')
        group.add_argument('-v', dest='_sub_verbose', help=get_explanation('-v'), action='store_true')
        group.add_argument('-s', dest='_sub_multi_threaded', help=get_explanation('-s'),
                           metavar='<pename>,<threads>')
        group.add_argument('-j', dest='_sub_wait_for', help=get_explanation('-j'),
                           metavar='<jid>')
245
246
247
248
        return group

    @classmethod
    def from_args(cls, args):
249
250
251
        """
        Create a SubmitParams from the command line arguments
        """
252
253
254
255
256
257
258
259
260
261
        as_dict = {value: getattr(args, '_sub_' + value, None) for value in cls.cmd_line_flags.values()}
        if args._sub_wait_for is not None:
            as_dict['wait_for'] = args._sub_wait_for.split(',')
        if args._sub_multi_threaded is not None:
            pename, threads = args._sub_multi_threaded.split(',')
            as_dict['multi_threaded'] = pename, threads
        return cls(verbose=args._sub_verbose, flags=args._sub_flags, **as_dict)


def submit(*command, **kwargs):
262
263
264
    """
    Submits a given command to the cluster

265
266
267
268
    The ``submit`` function is deprecated - you should use
    :mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python
    library, which is available in FSL 6.0.5 and newer.

269
    You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
Paul McCarthy's avatar
Paul McCarthy committed
270

271
    :arg command:        string or regular/unpacked sequence of strings with the job command
Paul McCarthy's avatar
Paul McCarthy committed
272
273
274
275
276
277
278
279
    :arg minutes:        Estimated job length in minutes, used to auto-set
                         queue name
    :arg queue:          Explicitly sets the queue name
    :arg architecture:   e.g., darwin or lx24-amd64
    :arg priority:       Lower priority [0:-1024] default = 0
    :arg email:          Who to email after job completion
    :arg wait_for:       Place a hold on this task until the job-ids in this
                         string or tuple are complete
280
    :arg job_name:       Specify job name as it will appear on the queue
Paul McCarthy's avatar
Paul McCarthy committed
281
282
283
284
285
286
287
288
289
    :arg ram:            Max total RAM to use for job (integer in MB)
    :arg logdir:         where to output logfiles
    :arg mail_options:   Change the SGE mail options, see qsub for details
    :arg output:         If <output> image or file already exists, do nothing
                         and exit
    :arg flags:          If True, use flags embedded in scripts to set SGE
                         queuing options
    :arg multi_threaded: Submit a multi-threaded task - Set to a tuple
                         containing two elements:
Paul McCarthy's avatar
Paul McCarthy committed
290

Paul McCarthy's avatar
Paul McCarthy committed
291
                          - <pename>: a PE configures for the requested queues
Paul McCarthy's avatar
Paul McCarthy committed
292

Paul McCarthy's avatar
Paul McCarthy committed
293
                          - <threads>: number of threads to run
Paul McCarthy's avatar
Paul McCarthy committed
294

Paul McCarthy's avatar
Paul McCarthy committed
295
    :arg verbose:        If True, use verbose mode
296
    :arg env:            Dict containing environment variables
Paul McCarthy's avatar
Paul McCarthy committed
297

298
    :return:             string of submitted job id
Paul McCarthy's avatar
Paul McCarthy committed
299
    """
300
    return SubmitParams(**kwargs)(*command)
Paul McCarthy's avatar
Paul McCarthy committed
301
302


303
@deprecated.deprecated('3.7.0', '4.0.0', 'Use fsl_sub.report instead')
304
def info(job_ids) -> Dict[str, Optional[Dict[str, str]]]:
Paul McCarthy's avatar
Paul McCarthy committed
305
306
    """Gets information on a given job id

307
308
309
310
    The ``info`` function is deprecated - you should use the
    ``fsl_sub.report`` function from the ``fsl_sub`` Python library, which
    is available in FSL 6.0.5 and newer.

311
    Uses `qstat -j <job_ids>`
Paul McCarthy's avatar
Paul McCarthy committed
312

313
314
315
    :arg job_ids: string with job id or (nested) sequence with jobs
    :return: dictionary of jobid -> another dictionary with job information
             (or None if job does not exist)
Paul McCarthy's avatar
Paul McCarthy committed
316
    """
317
318
319
320
321
322
    if not hasattr(info, '_ncall'):
        info._ncall = 0
    info._ncall += 1
    if info._ncall == 3:
        warnings.warn("Please do not call `fslsub.info` repeatably, because it slows down the cluster. You can avoid this message by simply passing all the job IDs you are interested in to a single `fslsub.info` call.")

Paul McCarthy's avatar
Paul McCarthy committed
323
    from fsl.utils.run import run
324
    job_ids_string = _flatten_job_ids(job_ids)
Paul McCarthy's avatar
Paul McCarthy committed
325
    try:
326
        result = run(['qstat', '-j', job_ids_string], exitcode=True)[0]
Paul McCarthy's avatar
Paul McCarthy committed
327
    except FileNotFoundError:
328
        log.debug("qstat not found; assuming not on cluster")
Paul McCarthy's avatar
Paul McCarthy committed
329
        return {}
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
    return _parse_qstat(job_ids_string, result)


def _parse_qstat(job_ids_string, qstat_stdout):
    """
    Parses the qstat output into a dictionary of dictionaries

    :param job_ids_string: input job ids
    :param qstat_stdout: qstat output
    :return: dictionary of jobid -> another dictionary with job information
             (or None if job does not exist)
    """
    res = {job_id: None for job_id in job_ids_string.split(',')}
    current_job_id = None
    for line in qstat_stdout.splitlines()[1:]:
        line = line.strip()
        if len(line) == 0:
            continue
        if line == '=' * len(line):
            current_job_id = None
        elif ':' in line:
            current_key, value = [part.strip() for part in line.split(':', 1)]
            if current_key == 'job_number':
                current_job_id = value
                if current_job_id not in job_ids_string:
                    raise ValueError(f"Unexpected job ID in qstat output:\n{line}")
                res[current_job_id] = {}
            else:
                if current_job_id is None:
                    raise ValueError(f"Found job information before job ID in qstat output:\n{line}")
                res[current_job_id][current_key] = value
        else:
            res[current_job_id][current_key] += '\n' + line
Paul McCarthy's avatar
Paul McCarthy committed
363
364
365
    return res


366
def output(job_id, logdir='.', command=None, name=None):
367
368
    """Returns the output of the given job.

369
    :arg job_id:  String containing job ID.
370
    :arg logdir:  Directory containing the log - defaults to
371
                  the current directory.
372
373
    :arg command: Command that was run. Not currently used.
    :arg name:    Job name if it was specified. Not currently used.
374
375
376
    :returns:     A tuple containing the standard output and standard error.
    """

377
378
    stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
    stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
379

380
381
382
383
384
385
    if len(stdout) != 1 or len(stderr) != 1:
        raise ValueError('No/too many error/output files for job {}: stdout: '
                         '{}, stderr: {}'.format(job_id, stdout, stderr))

    stdout = stdout[0]
    stderr = stderr[0]
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401

    if op.exists(stdout):
        with open(stdout, 'rt') as f:
            stdout = f.read()
    else:
        stdout = None

    if op.exists(stderr):
        with open(stderr, 'rt') as f:
            stderr = f.read()
    else:
        stderr = None

    return stdout, stderr


402
403
404
405
406
407
408
409
410
411
412
def _flatten_job_ids(job_ids):
    """
    Returns a potentially nested sequence of job ids as a single comma-separated string

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :return: comma-separated string of job ids
    """
    def unpack(job_ids):
        """Unpack the (nested) job-ids in a single set"""
        if isinstance(job_ids, str):
            return {job_ids}
413
414
        elif isinstance(job_ids, int):
            return {str(job_ids)}
415
416
417
418
419
420
421
422
423
        else:
            res = set()
            for job_id in job_ids:
                res.update(unpack(job_id))
            return res

    return ','.join(sorted(unpack(job_ids)))


Michiel Cottaar's avatar
Michiel Cottaar committed
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def hold(job_ids, hold_filename=None):
    """
    Waits until all jobs have finished

    Internally works by submitting a new job, which creates a file named `hold_filename`,
    which will only run after all jobs in `job_ids` finished.

    This function will only return once `hold_filename` has been created

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :param hold_filename: filename to use as a hold file.
        The containing directory should exist, but the file itself should not.
        Defaults to a ./.<random characters>.hold in the current directory.
    :return: only returns when all the jobs have finished
    """
    if hold_filename is None:
        with tempfile.NamedTemporaryFile(prefix='.', suffix='.hold', dir='.') as f:
            hold_filename = f.name
    if op.exists(hold_filename):
        raise IOError(f"Hold file ({hold_filename}) already exists")
    elif not op.isdir(op.split(op.abspath(hold_filename))[0]):
        raise IOError(f"Hold file ({hold_filename}) can not be created in non-existent directory")

447
    submit(('touch', hold_filename), wait_for=job_ids, minutes=1, job_name='.hold')
Michiel Cottaar's avatar
Michiel Cottaar committed
448
449
450
451
452
453
454

    while not op.exists(hold_filename):
        time.sleep(10)

    os.remove(hold_filename)


455
_external_job = ("""#!{}
Paul McCarthy's avatar
Paul McCarthy committed
456
457
458
# This is a temporary file designed to run the python function {},
# so that it can be submitted to the cluster
import pickle
Paul McCarthy's avatar
Paul McCarthy committed
459
from io import BytesIO
Paul McCarthy's avatar
Paul McCarthy committed
460
from importlib import import_module
Paul McCarthy's avatar
Paul McCarthy committed
461
{}
Paul McCarthy's avatar
Paul McCarthy committed
462
pickle_bytes = BytesIO({})
463
464
465
466
467
468
469
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)

if name_type == 'module':
    # retrieves a function defined in an external module
    func = getattr(import_module(name), func_name)
elif name_type == 'script':
    # retrieves a function defined in the __main__ script
470
    local_execute = {{'__name__': '__not_main__', '__file__': name}}
471
472
473
474
    exec(open(name, 'r').read(), local_execute)
    func = local_execute[func_name]
else:
    raise ValueError('Unknown name_type: %r' % name_type)
Paul McCarthy's avatar
Paul McCarthy committed
475

476
{}
Paul McCarthy's avatar
Paul McCarthy committed
477

478
""")
Paul McCarthy's avatar
Paul McCarthy committed
479
480


481
def func_to_cmd(func, args=None, kwargs=None, tmp_dir=None, clean="never", verbose=False):
Paul McCarthy's avatar
Paul McCarthy committed
482
483
    """Defines the command needed to run the function from the command line

484
485
486
487
    WARNING: if submitting a function defined in the __main__ script,
    the script will be run again to retrieve this function. Make sure there is a
    "if __name__ == '__main__'" guard to prevent the full script from being rerun.

Paul McCarthy's avatar
Paul McCarthy committed
488
489
490
491
    :arg func:    function to be run
    :arg args:    positional arguments
    :arg kwargs:  keyword arguments
    :arg tmp_dir: directory where to store the temporary file
492
493
    :arg clean:   Whether the script should be removed after running. There are three options:

Michiel Cottaar's avatar
Michiel Cottaar committed
494
        - "never" (default): Script is kept
495
496
497
498
        - "on_success": only remove if script successfully finished (i.e., no error is raised)
        - "always": always remove the script, even if it raises an error

    :arg verbose: If set to True, the script will print its own filename before running
Paul McCarthy's avatar
Paul McCarthy committed
499
500
    :return:      string which will run the function
    """
501
502
503
504
505
506
    if clean not in ('never', 'always', 'on_success'):
        raise ValueError(f"Clean should be one of 'never', 'always', or 'on_success', not {clean}")
    if args is None:
        args = ()
    if kwargs is None:
        kwargs = {}
Paul McCarthy's avatar
Paul McCarthy committed
507
    pickle_bytes = BytesIO()
508
509
510
511
512
513
    if func.__module__ == '__main__':
        pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
                     args, kwargs), pickle_bytes)
    else:
        pickle.dump(('module', func.__module__, func.__name__,
                     args, kwargs), pickle_bytes)
Paul McCarthy's avatar
Paul McCarthy committed
514

Michiel Cottaar's avatar
Michiel Cottaar committed
515
516
517
518
    handle, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
                                        suffix='.py',
                                        dir=tmp_dir)
    os.close(handle)
Paul McCarthy's avatar
Paul McCarthy committed
519

520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
    verbose_script = f'\nprint("running {filename}")\n' if verbose else ''
    if clean == 'never':
        run_script = "res = func(*args, **kwargs)"
    elif clean == 'always':
        run_script = f"""try:
    res = func(*args, **kwargs)
finally:
    import os; os.remove("{filename}")"""
    elif clean == 'on_success':
        run_script = f"""res = func(*args, **kwargs)
import os; os.remove("{filename}")"""
    python_cmd = _external_job.format(sys.executable,
                                      func.__name__,
                                      verbose_script,
                                      pickle_bytes.getvalue(),
                                      run_script)

Paul McCarthy's avatar
Paul McCarthy committed
537
538
539
    with open(filename, 'w') as python_file:
        python_file.write(python_cmd)

540
    return sys.executable + " " + filename