run.py 14.1 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
6
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
7
8
9
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
10
11
12
.. note:: The functions in this module are only known to work in Unix-like
          environments.

13
14
15
16
17
.. autosummary::
   :nosignatures:

   run
   runfsl
18
   dryrun
19
20
21
"""


22
23
24
25
26
27
28
29
30
31
32
import                    sys
import                    shlex
import                    logging
import                    threading
import                    contextlib
import collections.abc as abc
import subprocess      as sp
import os.path         as op
import                    os

import                    six
33

34
35
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
36
import fsl.utils.tempdir                  as tempdir
37
import fsl.utils.path                     as fslpath
38
39
40
41

log = logging.getLogger(__name__)


42
43
44
45
46
47
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


48
49
50
51
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


52
53
54
55
56
57
58
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


59
60
61
62
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
63
64

    The returned standard output will be equal to ``' '.join(args)``.
65
66
67
68
69
70
71
72
73
74
75
76
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


77
def prepareArgs(args):
78
79
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
80
81
82
83
    """

    if len(args) == 1:

84
85
        # Argument was a command string
        if isinstance(args[0], six.string_types):
86
            args = shlex.split(args[0])
87

88
89
90
        # Argument was an unpacked sequence
        else:
            args = args[0]
91

92
93
94
    return list(args)


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
114
        for line in iter(in_.readline, b''):
115
116
117
118
119
120
121
122
123
124
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


125
def run(*args, **kwargs):
126
127
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
128

129
130
131
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

132
    An exception is raised if the command returns a non-zero exit code, unless
133
    the ``exitcode`` option is set to ``True``.
134

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

166
167
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
168

169
170
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
171
    are passed through to the :func:`.fslsub.submit` function.
172

173
174
175
176
    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
177
    """
178

179
180
181
182
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
Paul McCarthy's avatar
Paul McCarthy committed
183
    log            = kwargs.pop('log',      None)
184
    args           = prepareArgs(args)
185

Paul McCarthy's avatar
Paul McCarthy committed
186
187
188
189
190
191
192
193
    if log is None:
        log = {}

    tee       = log.get('tee',    False)
    logStdout = log.get('stdout', None)
    logStderr = log.get('stderr', None)
    logCmd    = log.get('cmd',    None)

194
195
    if not bool(submit):
        submit = None
196

197
    if submit is not None:
198
199
200
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
201
202
203

        if submit is True:
            submit = dict()
204

205
    if submit is not None and not isinstance(submit, abc.Mapping):
206
207
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
208
209

    if DRY_RUN:
210
211
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
212

213
214
    # submit - delegate to fslsub
    if submit is not None:
215
        return fslsub.submit(' '.join(args), **submit, **kwargs)
216

217
218
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
219
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


254
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
255
256
257
258
259
260
261
262
263
264
265
266
267
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

268
269
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
270
271
272

    :arg args:      Command to run

273
274
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

275
276
277
278
279
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
280
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
281
282
    with tempdir.tempdir(changeto=False) as td:

283
284
        # We always direct the command's stdout/
        # stderr to two temporary files
285
286
287
288
289
290
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

291
292
293
294
295
296
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
297
            if tee:
298
299
300
301
302
303
304
305
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

306
307
            # log the command if requested
            if logCmd is not None:
308
                cmd = ' '.join(args) + '\n'
309
310
311
312
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
313
314
315

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
316
317
318
319
320
321
322
323
324
325
326
327

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

328
329
330
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
331

332
    return stdout, stderr, exitcode
333
334


335
def runfsl(*args, **kwargs):
336
337
    """Call a FSL command and return its output.

338
339
      This function searches for the command in the following
      locations (ordered by priority):
340

341
342
343
344
345
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
346
    """
347
    prefixes = []
348
349

    if FSL_PREFIX is not None:
350
351
352
353
354
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
355

356
    if not prefixes:
357
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
358

359
    args = prepareArgs(args)
360
361
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
362
        if fslplatform.fslwsl:
363
364
365
            wslargs = wslcmd(cmdpath, *args)
            if wslargs is not None:
                args = wslargs
366
367
                break
        elif op.isfile(cmdpath):
368
369
            args[0] = cmdpath
            break
370

371
372
373
374
375
376
    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

377
    return run(*args, **kwargs)
378

379

380
381
382
383
def wslcmd(cmdpath, *args):
    """
    Convert a command + arguments into an equivalent set of arguments that will run the command
    under Windows Subsystem for Linux
384

385
386
387
388
    :param cmdpath: Fully qualified path to the command. This is essentially a WSL path not a Windows
                    one since FSLDIR is specified as a WSL path, however it may have backslashes
                    as path separators due to previous use of ``os.path.join``
    :param args: Sequence of command arguments (the first of which is the unqualified command name)
389

390
391
392
393
    :return: If ``cmdpath`` exists and is executable in WSL, return a sequence of command arguments
             which when executed will run the command in WSL. Windows paths in the argument list will
             be converted to WSL paths. If ``cmdpath`` was not executable in WSL, returns None
    """
394
395
    # Check if command exists in WSL (remembering that the command path may include FSLDIR which
    # is a Windows path)
396
    cmdpath = fslpath.wslpath(cmdpath)
397
398
399
    retcode = sp.call(["wsl", "test", "-x", cmdpath])
    if retcode == 0:
        # Form a new argument list and convert any Windows paths in it into WSL paths
400
        wslargs = [fslpath.wslpath(arg) for arg in args]
401
        wslargs[0] = cmdpath
402
        local_fsldir = fslpath.wslpath(fslplatform.fsldir)
403
        if fslplatform.fsldevdir:
404
            local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
405
406
        else:
            local_fsldevdir = None
407
        # Prepend important environment variables - note that it seems we cannot
408
409
410
411
412
413
        # use WSLENV for this due to its insistance on path mapping. FIXME FSLDEVDIR?
        local_path = "$PATH"
        if local_fsldevdir:
            local_path += ":%s/bin" % local_fsldevdir
        local_path += ":%s/bin" % local_fsldir
        prepargs = [
414
            "wsl",
415
416
            "PATH=%s" % local_path,
            "FSLDIR=%s" % local_fsldir,
417
            "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
418
419
420
421
        ]
        if local_fsldevdir:
            prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
        return prepargs + wslargs
422
423
424
    else:
        # Command was not found in WSL with this path
        return None