fslsub.py 8.63 KB
Newer Older
Paul McCarthy's avatar
Paul McCarthy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
#
# fslsub.py - Functions for using fsl_sub.
#
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module submits jobs to a computing cluster using FSL's fsl_sub command
line tool. It is assumed that the computing cluster is managed by SGE.

Example usage, building a short pipeline::

    from fsl.utils.fslsub import submit, wait

    # submits bet to veryshort queue unless <mask_filename> already exists
    bet_job = submit('bet <input_filename> -m',
                     queue='veryshort.q',
                     output='<mask_filename>')

    # submits another job
    other_job = submit('some other pre-processing step', queue='short.q')

    # submits cuda job, that should only start after both preparatory jobs are
    # finished
    cuda_job = submit('expensive job',
                      wait_for=bet_job + other_job,
                      queue='cuda.q')

    # waits for the cuda job to finish
    wait(cuda_job)

.. autosummary::
   :nosignatures:

   submit
   info
36
   output
Paul McCarthy's avatar
Paul McCarthy committed
37
   wait
38
   func_to_cmd
Paul McCarthy's avatar
Paul McCarthy committed
39
40
41
42
43
"""


from six import string_types, BytesIO
import subprocess as sp
44
import os.path as op
45
import glob
Paul McCarthy's avatar
Paul McCarthy committed
46
47
48
49
import time
import pickle
import sys
import tempfile
50
import logging
51
import importlib
Paul McCarthy's avatar
Paul McCarthy committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82


log = logging.getLogger(__name__)


def submit(command,
           minutes=None,
           queue=None,
           architecture=None,
           priority=None,
           email=None,
           wait_for=None,
           job_name=None,
           ram=None,
           logdir=None,
           mail_options=None,
           output=None,
           flags=False,
           multi_threaded=None,
           verbose=False):
    """Submits a given command to the cluster

    :arg command:        single string with the job command
    :arg minutes:        Estimated job length in minutes, used to auto-set
                         queue name
    :arg queue:          Explicitly sets the queue name
    :arg architecture:   e.g., darwin or lx24-amd64
    :arg priority:       Lower priority [0:-1024] default = 0
    :arg email:          Who to email after job completion
    :arg wait_for:       Place a hold on this task until the job-ids in this
                         string or tuple are complete
83
    :arg job_name:       Specify job name as it will appear on the queue
Paul McCarthy's avatar
Paul McCarthy committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
    :arg ram:            Max total RAM to use for job (integer in MB)
    :arg logdir:         where to output logfiles
    :arg mail_options:   Change the SGE mail options, see qsub for details
    :arg output:         If <output> image or file already exists, do nothing
                         and exit
    :arg flags:          If True, use flags embedded in scripts to set SGE
                         queuing options
    :arg multi_threaded: Submit a multi-threaded task - Set to a tuple
                         containing two elements:
                          - <pename>: a PE configures for the requested queues
                          - <threads>: number of threads to run
    :arg verbose:        If True, use verbose mode

    :return:             tuple of submitted job ids
    """

    from fsl.utils.run import runfsl

    base_cmd = ['fsl_sub']

    for flag, variable_name in [
            ('-T', 'minutes'),
            ('-q', 'queue'),
            ('-a', 'architecture'),
            ('-p', 'priority'),
            ('-M', 'email'),
            ('-N', 'job_name'),
            ('-R', 'ram'),
            ('-l', 'logdir'),
            ('-m', 'mail_options'),
            ('-z', 'output')]:
        variable = locals()[variable_name]
        if variable:
            base_cmd.extend([flag, str(variable)])

    if flags:
        base_cmd.append('-F')
    if verbose:
        base_cmd.append('-v')

    if wait_for:
        if not isinstance(wait_for, string_types):
            wait_for = ','.join(wait_for)
        base_cmd.extend(['-j', wait_for])

    if multi_threaded:
        base_cmd.append('-s')
        base_cmd.extend(multi_threaded)

    base_cmd.append(command)

    return (runfsl(*base_cmd).strip(), )


def info(job_id):
    """Gets information on a given job id

    Uses `qstat -j <job_id>`

    :arg job_id: string with job id
    :return:     dictionary with information on the submitted job (empty
                 if job does not exist)
    """
    try:
        result = sp.call(['qstat', '-j', job_id]).decode('utf-8')
    except FileNotFoundError:
150
        log.debug("qstat not found; assuming not on cluster")
Paul McCarthy's avatar
Paul McCarthy committed
151
152
153
154
155
156
157
158
159
160
        return {}
    if 'Following jobs do not exist:' in result:
        return {}
    res = {}
    for line in result.splitlines()[1:]:
        key, value = line.split(':', nsplit=1)
        res[key.strip()] = value.strip()
    return res


161
def output(job_id, logdir='.', command=None, name=None):
162
163
    """Returns the output of the given job.

164
    :arg job_id:  String containing job ID.
165
    :arg logdir:  Directory containing the log - defaults to
166
                  the current directory.
167
168
    :arg command: Command that was run. Not currently used.
    :arg name:    Job name if it was specified. Not currently used.
169
170
171
    :returns:     A tuple containing the standard output and standard error.
    """

172
173
    stdout = list(glob.glob(op.join(logdir, '*.o{}'.format(job_id))))
    stderr = list(glob.glob(op.join(logdir, '*.e{}'.format(job_id))))
174

175
176
177
178
179
180
    if len(stdout) != 1 or len(stderr) != 1:
        raise ValueError('No/too many error/output files for job {}: stdout: '
                         '{}, stderr: {}'.format(job_id, stdout, stderr))

    stdout = stdout[0]
    stderr = stderr[0]
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

    if op.exists(stdout):
        with open(stdout, 'rt') as f:
            stdout = f.read()
    else:
        stdout = None

    if op.exists(stderr):
        with open(stderr, 'rt') as f:
            stderr = f.read()
    else:
        stderr = None

    return stdout, stderr


Paul McCarthy's avatar
Paul McCarthy committed
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def wait(job_ids):
    """Wait for one or more jobs to finish

    :arg job_ids: string or tuple of strings with jobs that should finish
                  before continuing
    """
    if isinstance(job_ids, string_types):
        job_ids = (job_ids, )
    start_time = time.time()
    for job_id in job_ids:
        log.debug('Waiting for job {}'.format(job_id))
        while len(info(job_id)) > 0:
            wait_time = min(max(1, (time.time() - start_time) / 3.), 20)
            time.sleep(wait_time)
        log.debug('Job {} finished, continuing to next'.format(job_id))
    log.debug('All jobs have finished')


_external_job = """#!{}
# This is a temporary file designed to run the python function {},
# so that it can be submitted to the cluster

import pickle
from six import BytesIO
from importlib import import_module

pickle_bytes = BytesIO({})
224
225
226
227
228
229
230
231
232
233
234
235
name_type, name, func_name, args, kwargs = pickle.load(pickle_bytes)

if name_type == 'module':
    # retrieves a function defined in an external module
    func = getattr(import_module(name), func_name)
elif name_type == 'script':
    # retrieves a function defined in the __main__ script
    local_execute = {{'__name__': '__not_main__'}}
    exec(open(name, 'r').read(), local_execute)
    func = local_execute[func_name]
else:
    raise ValueError('Unknown name_type: %r' % name_type)
Paul McCarthy's avatar
Paul McCarthy committed
236
237
238
239
240
241
242
243
244
245
246

res = func(*args, **kwargs)
if res is not None:
    with open(__file__ + '_out.pickle') as f:
        pickle.dump(f, res)
"""


def func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False):
    """Defines the command needed to run the function from the command line

247
248
249
250
    WARNING: if submitting a function defined in the __main__ script,
    the script will be run again to retrieve this function. Make sure there is a
    "if __name__ == '__main__'" guard to prevent the full script from being rerun.

Paul McCarthy's avatar
Paul McCarthy committed
251
252
253
254
    :arg func:    function to be run
    :arg args:    positional arguments
    :arg kwargs:  keyword arguments
    :arg tmp_dir: directory where to store the temporary file
255
    :arg clean:   if True removes the submitted script after running it
Paul McCarthy's avatar
Paul McCarthy committed
256
257
258
    :return:      string which will run the function
    """
    pickle_bytes = BytesIO()
259
260
261
262
263
264
    if func.__module__ == '__main__':
        pickle.dump(('script', importlib.import_module('__main__').__file__, func.__name__,
                     args, kwargs), pickle_bytes)
    else:
        pickle.dump(('module', func.__module__, func.__name__,
                     args, kwargs), pickle_bytes)
Paul McCarthy's avatar
Paul McCarthy committed
265
266
267
268
269
270
271
272
273
274
275
    python_cmd = _external_job.format(sys.executable,
                                      func.__name__,
                                      pickle_bytes.getvalue())

    _, filename = tempfile.mkstemp(prefix=func.__name__ + '_',
                                   suffix='.py',
                                   dir=tmp_dir)

    with open(filename, 'w') as python_file:
        python_file.write(python_cmd)

276
    return sys.executable + " " + filename + ('; rm ' + filename if clean else '')