run.py 14.8 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
6
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
7
8
9
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
10
11
12
.. note:: The functions in this module are only known to work in Unix-like
          environments.

13
14
15
16
17
.. autosummary::
   :nosignatures:

   run
   runfsl
18
   dryrun
19
   hold
20
21
22
"""


23
24
25
26
27
28
29
30
31
32
33
import                    sys
import                    shlex
import                    logging
import                    threading
import                    contextlib
import collections.abc as abc
import subprocess      as sp
import os.path         as op
import                    os

import                    six
34

35
36
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
37
import fsl.utils.tempdir                  as tempdir
38
import fsl.utils.path                     as fslpath
39
40
41
42

log = logging.getLogger(__name__)


43
44
45
46
47
48
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


49
50
51
52
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


53
54
55
56
57
58
59
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


60
61
62
63
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
64
65

    The returned standard output will be equal to ``' '.join(args)``.
66
67
68
69
70
71
72
73
74
75
76
77
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


78
def prepareArgs(args):
79
80
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
81
82
83
84
    """

    if len(args) == 1:

85
86
        # Argument was a command string
        if isinstance(args[0], six.string_types):
87
            args = shlex.split(args[0])
88

89
90
91
        # Argument was an unpacked sequence
        else:
            args = args[0]
92

93
94
95
    return list(args)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
115
        for line in iter(in_.readline, b''):
116
117
118
119
120
121
122
123
124
125
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


126
def run(*args, **kwargs):
127
128
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
129

130
131
132
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

133
    An exception is raised if the command returns a non-zero exit code, unless
134
    the ``exitcode`` option is set to ``True``.
135

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

154
155
156
157
    :arg cmdonly:  Defaults to ``False``. If ``True``, the command is not
                   executed, but rather is returned directly, as a list of
                   arguments.

158
159
160
161
162
163
164
165
166
167
168
169
170
    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

171
172
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
173

174
175
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
176
    are passed through to the :func:`.fslsub.submit` function.
177

178
179
180
181
    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
182
    """
183

184
185
186
187
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
188
    cmdonly        = kwargs.pop('cmdonly',  False)
Paul McCarthy's avatar
Paul McCarthy committed
189
    log            = kwargs.pop('log',      None)
190
    args           = prepareArgs(args)
191

Paul McCarthy's avatar
Paul McCarthy committed
192
193
194
195
196
197
198
199
    if log is None:
        log = {}

    tee       = log.get('tee',    False)
    logStdout = log.get('stdout', None)
    logStderr = log.get('stderr', None)
    logCmd    = log.get('cmd',    None)

200
201
    if not bool(submit):
        submit = None
202

203
    if submit is not None:
204
205
206
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
207
208
209

        if submit is True:
            submit = dict()
210

211
    if submit is not None and not isinstance(submit, abc.Mapping):
212
213
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
214

215
216
217
    if cmdonly:
        return args

218
    if DRY_RUN:
219
220
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
221

222
223
    # submit - delegate to fslsub
    if submit is not None:
224
        return fslsub.submit(' '.join(args), **submit, **kwargs)
225

226
227
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
228
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


263
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
264
265
266
267
268
269
270
271
272
273
274
275
276
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

277
278
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
279
280
281

    :arg args:      Command to run

282
283
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

284
285
286
287
288
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
289
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
290
291
    with tempdir.tempdir(changeto=False) as td:

292
293
        # We always direct the command's stdout/
        # stderr to two temporary files
294
295
296
297
298
299
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

300
301
302
303
304
305
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
306
            if tee:
307
308
309
310
311
312
313
314
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

315
316
            # log the command if requested
            if logCmd is not None:
317
                cmd = ' '.join(args) + '\n'
318
319
320
321
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
322
323
324

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
325
326
327
328
329
330
331
332
333
334
335
336

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

337
338
339
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
340

341
    return stdout, stderr, exitcode
342
343


344
def runfsl(*args, **kwargs):
345
346
    """Call a FSL command and return its output.

347
348
      This function searches for the command in the following
      locations (ordered by priority):
349

350
351
352
353
354
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
355
    """
356
    prefixes = []
357
358

    if FSL_PREFIX is not None:
359
360
361
362
363
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
364

365
    if not prefixes:
366
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
367

368
    args = prepareArgs(args)
369
370
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
371
        if fslplatform.fslwsl:
372
373
374
            wslargs = wslcmd(cmdpath, *args)
            if wslargs is not None:
                args = wslargs
375
376
                break
        elif op.isfile(cmdpath):
377
378
            args[0] = cmdpath
            break
379

380
381
382
383
384
385
    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

386
    return run(*args, **kwargs)
387

388

389
390
391
392
def wslcmd(cmdpath, *args):
    """
    Convert a command + arguments into an equivalent set of arguments that will run the command
    under Windows Subsystem for Linux
393

394
395
396
397
    :param cmdpath: Fully qualified path to the command. This is essentially a WSL path not a Windows
                    one since FSLDIR is specified as a WSL path, however it may have backslashes
                    as path separators due to previous use of ``os.path.join``
    :param args: Sequence of command arguments (the first of which is the unqualified command name)
398

399
400
401
402
    :return: If ``cmdpath`` exists and is executable in WSL, return a sequence of command arguments
             which when executed will run the command in WSL. Windows paths in the argument list will
             be converted to WSL paths. If ``cmdpath`` was not executable in WSL, returns None
    """
403
404
    # Check if command exists in WSL (remembering that the command path may include FSLDIR which
    # is a Windows path)
405
    cmdpath = fslpath.wslpath(cmdpath)
406
407
408
    retcode = sp.call(["wsl", "test", "-x", cmdpath])
    if retcode == 0:
        # Form a new argument list and convert any Windows paths in it into WSL paths
409
        wslargs = [fslpath.wslpath(arg) for arg in args]
410
        wslargs[0] = cmdpath
411
        local_fsldir = fslpath.wslpath(fslplatform.fsldir)
412
        if fslplatform.fsldevdir:
413
            local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
414
415
        else:
            local_fsldevdir = None
416
        # Prepend important environment variables - note that it seems we cannot
417
418
419
420
421
422
        # use WSLENV for this due to its insistance on path mapping. FIXME FSLDEVDIR?
        local_path = "$PATH"
        if local_fsldevdir:
            local_path += ":%s/bin" % local_fsldevdir
        local_path += ":%s/bin" % local_fsldir
        prepargs = [
423
            "wsl",
424
425
            "PATH=%s" % local_path,
            "FSLDIR=%s" % local_fsldir,
426
            "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
427
428
429
430
        ]
        if local_fsldevdir:
            prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
        return prepargs + wslargs
431
432
433
    else:
        # Command was not found in WSL with this path
        return None
434
435
436
437
438
439
440
441
442
443
444
445
446


def hold(job_ids, hold_filename=None):
    """
    Waits until all jobs have finished

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :param hold_filename: filename to use as a hold file.
        The containing directory should exist, but the file itself should not.
        Defaults to a ./.<random characters>.hold in the current directory.
    :return: only returns when all the jobs have finished
    """
    fslsub.hold(job_ids, hold_filename)