run.py 13.6 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
6
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
7
8
9
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
10
11
12
.. note:: The functions in this module are only known to work in Unix-like
          environments.

13
14
15
16
17
.. autosummary::
   :nosignatures:

   run
   runfsl
18
19
   wait
   dryrun
20
21
22
"""


23
import               sys
24
import               shlex
25
import               logging
26
import               threading
27
import               contextlib
28
import               collections
29
30
import subprocess as sp
import os.path    as op
31
32
import               os
import               re
33

34
35
import               six

36
37
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
38
import fsl.utils.tempdir                  as tempdir
39
40
41
42
43


log = logging.getLogger(__name__)


44
45
46
47
48
49
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


50
51
52
53
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


54
55
56
57
58
59
60
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


61
62
63
64
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
65
66

    The returned standard output will be equal to ``' '.join(args)``.
67
68
69
70
71
72
73
74
75
76
77
78
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


79
def prepareArgs(args):
80
81
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
82
83
84
85
    """

    if len(args) == 1:

86
87
        # Argument was a command string
        if isinstance(args[0], six.string_types):
88
            args = shlex.split(args[0])
89

90
91
92
        # Argument was an unpacked sequence
        else:
            args = args[0]
93

94
95
96
    return list(args)


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
116
        for line in iter(in_.readline, b''):
117
118
119
120
121
122
123
124
125
126
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


127
def run(*args, **kwargs):
128
129
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
130

131
132
133
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

134
    An exception is raised if the command returns a non-zero exit code, unless
135
    the ``exitcode`` option is set to ``True``.
136

137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

168
169
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
170

171
172
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
173
    are passed through to the :func:`.fslsub.submit` function.
174

175
176
177
178
    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
179
    """
180

181
182
183
184
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
Paul McCarthy's avatar
Paul McCarthy committed
185
    log            = kwargs.pop('log',      None)
186
    args           = prepareArgs(args)
187

Paul McCarthy's avatar
Paul McCarthy committed
188
189
190
191
192
193
194
195
    if log is None:
        log = {}

    tee       = log.get('tee',    False)
    logStdout = log.get('stdout', None)
    logStderr = log.get('stderr', None)
    logCmd    = log.get('cmd',    None)

196
197
    if not bool(submit):
        submit = None
198

199
    if submit is not None:
200
201
202
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
203
204
205

        if submit is True:
            submit = dict()
206

207
208
209
    if submit is not None and not isinstance(submit, collections.Mapping):
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
210
211

    if DRY_RUN:
212
213
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
214

215
216
    # submit - delegate to fslsub
    if submit is not None:
217
        return fslsub.submit(' '.join(args), **submit, **kwargs)
218

219
220
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
221
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


256
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
257
258
259
260
261
262
263
264
265
266
267
268
269
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

270
271
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
272
273
274

    :arg args:      Command to run

275
276
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

277
278
279
280
281
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
282
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
283
284
    with tempdir.tempdir(changeto=False) as td:

285
286
        # We always direct the command's stdout/
        # stderr to two temporary files
287
288
289
290
291
292
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

293
294
295
296
297
298
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
299
            if tee:
300
301
302
303
304
305
306
307
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

308
309
            # log the command if requested
            if logCmd is not None:
310
                cmd = ' '.join(args) + '\n'
311
312
313
314
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
315
316
317

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
318
319
320
321
322
323
324
325
326
327
328
329

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

330
331
332
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
333

334
    return stdout, stderr, exitcode
335
336


337
def runfsl(*args, **kwargs):
338
339
    """Call a FSL command and return its output.

340
341
      This function searches for the command in the following
      locations (ordered by priority):
342

343
344
345
346
347
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
348
    """
349
    prefixes = []
350
351

    if FSL_PREFIX is not None:
352
353
354
355
356
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
357

358
    if not prefixes:
359
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
360

361
    args = prepareArgs(args)
362
363
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
        if fslplatform.fslwsl:
            # We want to run the command from FSL installed in the Windows Subsystem for Linux
            # First we check it exists in WSL (translating Windows path separators to Unix)
            # Then we convert any Windows paths in the arguments (e.g. temp files) to the
            # corresponding WSL Unix paths (C:/ -> /mnt/c)
            # Then we prepend important environment variables - note that it seems we cannot
            # use WSLENV for this due to its insistance on path mapping.
            # Finally we append the command and its arguments
            cmdpath = cmdpath.replace("\\", "/")
            retcode = sp.call(["wsl", "test", "-x", cmdpath])
            if retcode == 0:
                args[0] = cmdpath
                args = [wslpath(arg) for arg in args]
                args = [
                    "wsl",
                    "PATH=$PATH:%s/bin" % fslplatform.fsldir,
                    "FSLDIR=%s" % fslplatform.fsldir,
                    "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
                ] + args
                break
        elif op.isfile(cmdpath):
385
386
            args[0] = cmdpath
            break
387

388
389
390
391
392
393
    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

394
    return run(*args, **kwargs)
395

396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def wslpath(patharg):
    """ 
    Convert a command line argument containing a Windows path to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``) 
    
    :param patharg: Command line argument which may (or may not) contain a Windows path. It is assumed to be 
                    either of the form <windows path> or --arg=<windows path>
    """
    match = re.match("^(--[\w-]+=)?([a-zA-z]):(.+)$", path)
    if match:
        print(match)
        print(match.group(1))
        print(match.group(2))
        print(match.group(3))
        arg, drive, path = match.group(1, 2, 3)
        if arg is None:
            arg = ""
        return arg + "/mnt/" + drive.lower() + path.replace("\\", "/") 
    else:
        return path
415

416
def wait(job_ids):
417
    """Proxy for :func:`.fslsub.wait`. """
418
    return fslsub.wait(job_ids)