run.py 14.6 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
6
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
7
8
9
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
10
11
12
.. note:: The functions in this module are only known to work in Unix-like
          environments.

13
14
15
16
17
.. autosummary::
   :nosignatures:

   run
   runfsl
18
   dryrun
19
   hold
20
21
22
"""


23
24
25
26
27
28
29
30
31
32
33
import                    sys
import                    shlex
import                    logging
import                    threading
import                    contextlib
import collections.abc as abc
import subprocess      as sp
import os.path         as op
import                    os

import                    six
34

35
36
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
37
import fsl.utils.tempdir                  as tempdir
38
import fsl.utils.path                     as fslpath
39
40
41
42

log = logging.getLogger(__name__)


43
44
45
46
47
48
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


49
50
51
52
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


53
54
55
56
57
58
59
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


60
61
62
63
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
64
65

    The returned standard output will be equal to ``' '.join(args)``.
66
67
68
69
70
71
72
73
74
75
76
77
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


78
def prepareArgs(args):
79
80
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
81
82
83
84
    """

    if len(args) == 1:

85
86
        # Argument was a command string
        if isinstance(args[0], six.string_types):
87
            args = shlex.split(args[0])
88

89
90
91
        # Argument was an unpacked sequence
        else:
            args = args[0]
92

93
94
95
    return list(args)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
115
        for line in iter(in_.readline, b''):
116
117
118
119
120
121
122
123
124
125
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


126
def run(*args, **kwargs):
127
128
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
129

130
131
132
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

133
    An exception is raised if the command returns a non-zero exit code, unless
134
    the ``exitcode`` option is set to ``True``.
135

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

167
168
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
169

170
171
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
172
    are passed through to the :func:`.fslsub.submit` function.
173

174
175
176
177
    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
178
    """
179

180
181
182
183
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
Paul McCarthy's avatar
Paul McCarthy committed
184
    log            = kwargs.pop('log',      None)
185
    args           = prepareArgs(args)
186

Paul McCarthy's avatar
Paul McCarthy committed
187
188
189
190
191
192
193
194
    if log is None:
        log = {}

    tee       = log.get('tee',    False)
    logStdout = log.get('stdout', None)
    logStderr = log.get('stderr', None)
    logCmd    = log.get('cmd',    None)

195
196
    if not bool(submit):
        submit = None
197

198
    if submit is not None:
199
200
201
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
202
203
204

        if submit is True:
            submit = dict()
205

206
    if submit is not None and not isinstance(submit, abc.Mapping):
207
208
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
209
210

    if DRY_RUN:
211
212
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
213

214
215
    # submit - delegate to fslsub
    if submit is not None:
216
        return fslsub.submit(' '.join(args), **submit, **kwargs)
217

218
219
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
220
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


255
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
256
257
258
259
260
261
262
263
264
265
266
267
268
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

269
270
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
271
272
273

    :arg args:      Command to run

274
275
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

276
277
278
279
280
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
281
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
282
283
    with tempdir.tempdir(changeto=False) as td:

284
285
        # We always direct the command's stdout/
        # stderr to two temporary files
286
287
288
289
290
291
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

292
293
294
295
296
297
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
298
            if tee:
299
300
301
302
303
304
305
306
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

307
308
            # log the command if requested
            if logCmd is not None:
309
                cmd = ' '.join(args) + '\n'
310
311
312
313
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
314
315
316

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
317
318
319
320
321
322
323
324
325
326
327
328

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

329
330
331
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
332

333
    return stdout, stderr, exitcode
334
335


336
def runfsl(*args, **kwargs):
337
338
    """Call a FSL command and return its output.

339
340
      This function searches for the command in the following
      locations (ordered by priority):
341

342
343
344
345
346
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
347
    """
348
    prefixes = []
349
350

    if FSL_PREFIX is not None:
351
352
353
354
355
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
356

357
    if not prefixes:
358
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
359

360
    args = prepareArgs(args)
361
362
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
363
        if fslplatform.fslwsl:
364
365
366
            wslargs = wslcmd(cmdpath, *args)
            if wslargs is not None:
                args = wslargs
367
368
                break
        elif op.isfile(cmdpath):
369
370
            args[0] = cmdpath
            break
371

372
373
374
375
376
377
    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

378
    return run(*args, **kwargs)
379

380

381
382
383
384
def wslcmd(cmdpath, *args):
    """
    Convert a command + arguments into an equivalent set of arguments that will run the command
    under Windows Subsystem for Linux
385

386
387
388
389
    :param cmdpath: Fully qualified path to the command. This is essentially a WSL path not a Windows
                    one since FSLDIR is specified as a WSL path, however it may have backslashes
                    as path separators due to previous use of ``os.path.join``
    :param args: Sequence of command arguments (the first of which is the unqualified command name)
390

391
392
393
394
    :return: If ``cmdpath`` exists and is executable in WSL, return a sequence of command arguments
             which when executed will run the command in WSL. Windows paths in the argument list will
             be converted to WSL paths. If ``cmdpath`` was not executable in WSL, returns None
    """
395
396
    # Check if command exists in WSL (remembering that the command path may include FSLDIR which
    # is a Windows path)
397
    cmdpath = fslpath.wslpath(cmdpath)
398
399
400
    retcode = sp.call(["wsl", "test", "-x", cmdpath])
    if retcode == 0:
        # Form a new argument list and convert any Windows paths in it into WSL paths
401
        wslargs = [fslpath.wslpath(arg) for arg in args]
402
        wslargs[0] = cmdpath
403
        local_fsldir = fslpath.wslpath(fslplatform.fsldir)
404
        if fslplatform.fsldevdir:
405
            local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
406
407
        else:
            local_fsldevdir = None
408
        # Prepend important environment variables - note that it seems we cannot
409
410
411
412
413
414
        # use WSLENV for this due to its insistance on path mapping. FIXME FSLDEVDIR?
        local_path = "$PATH"
        if local_fsldevdir:
            local_path += ":%s/bin" % local_fsldevdir
        local_path += ":%s/bin" % local_fsldir
        prepargs = [
415
            "wsl",
416
417
            "PATH=%s" % local_path,
            "FSLDIR=%s" % local_fsldir,
418
            "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
419
420
421
422
        ]
        if local_fsldevdir:
            prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
        return prepargs + wslargs
423
424
425
    else:
        # Command was not found in WSL with this path
        return None
426
427
428
429
430
431
432
433
434
435
436
437
438


def hold(job_ids, hold_filename=None):
    """
    Waits until all jobs have finished

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :param hold_filename: filename to use as a hold file.
        The containing directory should exist, but the file itself should not.
        Defaults to a ./.<random characters>.hold in the current directory.
    :return: only returns when all the jobs have finished
    """
    fslsub.hold(job_ids, hold_filename)