fslsub.py 9.47 KB
Newer Older
Paul McCarthy's avatar
Paul McCarthy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
#
# fslsub.py - Functions for using fsl_sub.
#
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE.

Example usage, building a short pipeline::

    from fsl.utils.fslsub import submit, wait

    # submits bet to veryshort queue unless <mask_filename> already exists
    bet_job = submit('bet <input_filename> -m',
                     queue='veryshort.q',
                     output='<mask_filename>')

    # submits another job
    other_job = submit('some other pre-processing step', queue='short.q')

    # submits cuda job, that should only start after both preparatory jobs are
23
24
    # finished. This will work if bet_job and other_job are single job-ids
    # (i.e., strings) or a sequence of multiple job-ids
Paul McCarthy's avatar
Paul McCarthy committed
25
    cuda_job = submit('expensive job',
26
                      wait_for=(bet_job, other_job),
Paul McCarthy's avatar
Paul McCarthy committed
27
28
29
30
31
32
33
34
35
36
                      queue='cuda.q')

    # waits for the cuda job to finish
    wait(cuda_job)

.. autosummary::
   :nosignatures:

   submit
   info
37
   output
Paul McCarthy's avatar
Paul McCarthy committed
38
   wait
39
   func_to_cmd
Paul McCarthy's avatar
Paul McCarthy committed
40
41
42
43
44
"""


from six import string_types, BytesIO
import subprocess as sp
45
import os.path as op
46
import glob
Paul McCarthy's avatar
Paul McCarthy committed
47
48
49
50
import time
import pickle
import sys
import tempfile
51
import logging
52
import importlib
Paul McCarthy's avatar
Paul McCarthy committed
53
54
55
56
57


log = logging.getLogger(__name__)


58
def submit(*command,
Paul McCarthy's avatar
Paul McCarthy committed
59
60
61
62
63
64
65
66
67
68
69
70
71
72
           minutes=None,
           queue=None,
           architecture=None,
           priority=None,
           email=None,
           wait_for=None,
           job_name=None,
           ram=None,
           logdir=None,
           mail_options=None,
           output=None,
           flags=False,
           multi_threaded=None,
           verbose=False):
73
74
75
76
    """
    Submits a given command to the cluster

    You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
Paul McCarthy's avatar
Paul McCarthy committed
77

78
    :arg command:        string or regular/unpacked sequence of strings with the job command
Paul McCarthy's avatar
Paul McCarthy committed
79
80
81
82
83
84
85
86
    :arg minutes:        Estimated job length in minutes, used to auto-set
                         queue name
    :arg queue:          Explicitly sets the queue name
    :arg architecture:   e.g., darwin or lx24-amd64
    :arg priority:       Lower priority [0:-1024] default = 0
    :arg email:          Who to email after job completion
    :arg wait_for:       Place a hold on this task until the job-ids in this
                         string or tuple are complete
87
    :arg job_name:       Specify job name as it will appear on the queue
Paul McCarthy's avatar
Paul McCarthy committed
88
89
90
91
92
93
94
95
96
    :arg ram:            Max total RAM to use for job (integer in MB)
    :arg logdir:         where to output logfiles
    :arg mail_options:   Change the SGE mail options, see qsub for details
    :arg output:         If <output> image or file already exists, do nothing
                         and exit
    :arg flags:          If True, use flags embedded in scripts to set SGE
                         queuing options
    :arg multi_threaded: Submit a multi-threaded task - Set to a tuple
                         containing two elements:
Paul McCarthy's avatar
Paul McCarthy committed
97

Paul McCarthy's avatar
Paul McCarthy committed
98
                          - <pename>: a PE configures for the requested queues
Paul McCarthy's avatar
Paul McCarthy committed
99

Paul McCarthy's avatar
Paul McCarthy committed
100
                          - <threads>: number of threads to run
Paul McCarthy's avatar
Paul McCarthy committed
101

Paul McCarthy's avatar
Paul McCarthy committed
102
103
    :arg verbose:        If True, use verbose mode

104
    :return:             string of submitted job id
Paul McCarthy's avatar
Paul McCarthy committed
105
106
    """

107
    from fsl.utils.run import runfsl, prepareArgs
Paul McCarthy's avatar
Paul McCarthy committed
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131

    base_cmd = ['fsl_sub']

    for flag, variable_name in [
            ('-T', 'minutes'),
            ('-q', 'queue'),
            ('-a', 'architecture'),
            ('-p', 'priority'),
            ('-M', 'email'),
            ('-N', 'job_name'),
            ('-R', 'ram'),
            ('-l', 'logdir'),
            ('-m', 'mail_options'),
            ('-z', 'output')]:
        variable = locals()[variable_name]
        if variable:
            base_cmd.extend([flag, str(variable)])

    if flags:
        base_cmd.append('-F')
    if verbose:
        base_cmd.append('-v')

    if wait_for:
132
        base_cmd.extend(['-j', _flatten_job_ids(wait_for)])
Paul McCarthy's avatar
Paul McCarthy committed
133
134
135
136
137

    if multi_threaded:
        base_cmd.append('-s')
        base_cmd.extend(multi_threaded)

138
    base_cmd.extend(prepareArgs(command))
Paul McCarthy's avatar
Paul McCarthy committed
139

140
    return runfsl(*base_cmd).strip()
Paul McCarthy's avatar
Paul McCarthy committed
141
142
143
144
145
146
147
148
149
150
151
152
153
154


def info(job_id):
    """Gets information on a given job id

    Uses `qstat -j <job_id>`

    :arg job_id: string with job id
    :return:     dictionary with information on the submitted job (empty
                 if job does not exist)
    """
    try:
        result = sp.call(['qstat', '-j', job_id]).decode('utf-8')
    except FileNotFoundError:
155
        log.debug("qstat not found; assuming not on cluster")
Paul McCarthy's avatar
Paul McCarthy committed
156
157
158
159
160
161
162
163
164
165
        return {}
    if 'Following jobs do not exist:' in result:
        return {}
    res = {}
    for line in result.splitlines()[1:]:
        key, value = line.split(':', nsplit=1)
        res[key.strip()] = value.strip()
    return res


166
def output(job_id, logdir='.', command=None, name=None):
167
168
    """Returns the output of the given job.

169
    :arg job_id:  String containing job ID.
170
    :arg logdir:  Directory containing the log - defaults to
171
                  the current directory.
172
173
    :arg command: Command that was run. Not currently used.
    :arg name:    Job name if it was specified. Not currently used.
174
175
176
    :returns:     A tuple containing the standard output and standard error.
    """

177
178
    stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
    stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
179

180
181
182
183
184
185
    if len(stdout) != 1 or len(stderr) != 1:
        raise ValueError('No/too many error/output files for job {}: stdout: '
                         '{}, stderr: {}'.format(job_id, stdout, stderr))

    stdout = stdout[0]
    stderr = stderr[0]
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

    if op.exists(stdout):
        with open(stdout, 'rt') as f:
            stdout = f.read()
    else:
        stdout = None

    if op.exists(stderr):
        with open(stderr, 'rt') as f:
            stderr = f.read()
    else:
        stderr = None

    return stdout, stderr


Paul McCarthy's avatar
Paul McCarthy committed
202
203
204
205
206
207
208
def wait(job_ids):
    """Wait for one or more jobs to finish

    :arg job_ids: string or tuple of strings with jobs that should finish
                  before continuing
    """
    start_time = time.time()
209
    for job_id in _flatten_job_ids(job_ids):
Paul McCarthy's avatar
Paul McCarthy committed
210
211
212
213
214
215
216
217
        log.debug('Waiting for job {}'.format(job_id))
        while len(info(job_id)) > 0:
            wait_time = min(max(1, (time.time() - start_time) / 3.), 20)
            time.sleep(wait_time)
        log.debug('Job {} finished, continuing to next'.format(job_id))
    log.debug('All jobs have finished')


218
219
220
221
222
223
224
225
226
227
228
def _flatten_job_ids(job_ids):
    """
    Returns a potentially nested sequence of job ids as a single comma-separated string

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :return: comma-separated string of job ids
    """
    def unpack(job_ids):
        """Unpack the (nested) job-ids in a single set"""
        if isinstance(job_ids, str):
            return {job_ids}
229
230
        elif isinstance(job_ids, int):
            return {str(job_ids)}
231
232
233
234
235
236
237
238
239
        else:
            res = set()
            for job_id in job_ids:
                res.update(unpack(job_id))
            return res

    return ','.join(sorted(unpack(job_ids)))


Paul McCarthy's avatar
Paul McCarthy committed
240
241
242
243
244
245
246
247
248
_external_job = """#!{}
# This is a temporary file designed to run the python function {},
# so that it can be submitted to the cluster

import pickle
from six import BytesIO
from importlib import import_module

pickle_bytes = BytesIO({})
249
250
251
252
253
254
255
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)

if name_type == 'module':
    # retrieves a function defined in an external module
    func = getattr(import_module(name), func_name)
elif name_type == 'script':
    # retrieves a function defined in the __main__ script
256
    local_execute = {{'__name__': '__not_main__', '__file__': name}}
257
258
259
260
    exec(open(name, 'r').read(), local_execute)
    func = local_execute[func_name]
else:
    raise ValueError('Unknown name_type: %r' % name_type)
Paul McCarthy's avatar
Paul McCarthy committed
261
262
263
264
265
266
267
268
269
270
271

res = func(*args, **kwargs)
if res is not None:
    with open(__file__ + '_out.pickle') as f:
        pickle.dump(f, res)
"""


def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
    """Defines the command needed to run the function from the command line

272
273
274
275
    WARNING: if submitting a function defined in the __main__ script,
    the script will be run again to retrieve this function. Make sure there is a
    "if __name__ == '__main__'" guard to prevent the full script from being rerun.

Paul McCarthy's avatar
Paul McCarthy committed
276
277
278
279
    :arg func:    function to be run
    :arg args:    positional arguments
    :arg kwargs:  keyword arguments
    :arg tmp_dir: directory where to store the temporary file
280
    :arg clean:   if True removes the submitted script after running it
Paul McCarthy's avatar
Paul McCarthy committed
281
282
283
    :return:      string which will run the function
    """
    pickle_bytes = BytesIO()
284
285
286
287
288
289
    if func.__module__ == '__main__':
        pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
                     args, kwargs), pickle_bytes)
    else:
        pickle.dump(('module', func.__module__, func.__name__,
                     args, kwargs), pickle_bytes)
Paul McCarthy's avatar
Paul McCarthy committed
290
291
292
293
294
295
296
297
298
299
300
    python_cmd = _external_job.format(sys.executable,
                                      func.__name__,
                                      pickle_bytes.getvalue())

    _, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
                                   suffix='.py',
                                   dir=tmp_dir)

    with open(filename, 'w') as python_file:
        python_file.write(python_cmd)

301
    return sys.executable + " " + filename + ('; rm ' + filename if clean else '')