run.py 15.7 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
6
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
7
8
9
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
10
11
12
.. note:: The functions in this module are only known to work in Unix-like
          environments.

13
14
15
16
17
.. autosummary::
   :nosignatures:

   run
   runfsl
18
   dryrun
19
   hold
20
21
22
"""


23
24
25
26
27
28
29
30
31
32
import                    sys
import                    shlex
import                    logging
import                    threading
import                    contextlib
import collections.abc as abc
import subprocess      as sp
import os.path         as op
import                    os

33
from   fsl.utils.platform import platform as fslplatform
34
import fsl.utils.tempdir                  as tempdir
35
import fsl.utils.path                     as fslpath
36

37

38
39
40
log = logging.getLogger(__name__)


41
42
43
44
45
46
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


47
48
49
50
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


51
52
53
54
55
56
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """


57
@contextlib.contextmanager
Paul McCarthy's avatar
Paul McCarthy committed
58
def dryrun(*_):
59
60
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
61
62

    The returned standard output will be equal to ``' '.join(args)``.
63
    """
Paul McCarthy's avatar
Paul McCarthy committed
64
    global DRY_RUN  # pylint: disable=global-statement
65
66
67
68
69
70
71
72
73
74

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


75
def prepareArgs(args):
76
77
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
78
79
80
81
    """

    if len(args) == 1:

82
        # Argument was a command string
Paul McCarthy's avatar
Paul McCarthy committed
83
        if isinstance(args[0], str):
84
            args = shlex.split(args[0])
85

86
87
88
        # Argument was an unpacked sequence
        else:
            args = args[0]
89

90
91
92
    return list(args)


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
112
        for line in iter(in_.readline, b''):
113
114
115
116
117
118
119
120
121
122
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


123
def run(*args, **kwargs):
124
125
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
126

127
128
129
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

130
    An exception is raised if the command returns a non-zero exit code, unless
131
    the ``exitcode`` option is set to ``True``.
132

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
148
                   the :mod:`fsl.wrappers.fsl_sub` function.  May also be a
149
150
                   dictionary containing arguments to that function.

151
152
153
154
    :arg cmdonly:  Defaults to ``False``. If ``True``, the command is not
                   executed, but rather is returned directly, as a list of
                   arguments.

155
156
157
158
    :arg log:      Must be passed as a keyword argument.  Defaults to
                   ``{'tee' : True}``. An optional ``dict`` which may be used
                   to redirect the command's standard output and error. The
                   following keys are recognised:
159

160
161
162
                     - tee:    If ``True`` (the default), the command's
                               standard output/error streams are forwarded to
                               this processes streams.
163
164
165
166
167
168
169

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

170
171
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
172

173
174
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
175
    are passed through to the :func:`.fslsub.submit` function.
176

177
178
179
180
    :returns: If ``submit`` is provided, the ID of the submitted job is
              returned as a string. Otherwise returns a single value or a
              tuple, based on the based on the ``stdout``, ``stderr``, and
              ``exitcode`` arguments.
181
    """
182

183
184
185
186
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
187
    cmdonly        = kwargs.pop('cmdonly',  False)
Paul McCarthy's avatar
Paul McCarthy committed
188
    logg           = kwargs.pop('log',      None)
189
    args           = prepareArgs(args)
190

Paul McCarthy's avatar
Paul McCarthy committed
191
192
    if logg is None:
        logg = {}
Paul McCarthy's avatar
Paul McCarthy committed
193

194
    tee       = logg.get('tee',    True)
Paul McCarthy's avatar
Paul McCarthy committed
195
196
197
    logStdout = logg.get('stdout', None)
    logStderr = logg.get('stderr', None)
    logCmd    = logg.get('cmd',    None)
Paul McCarthy's avatar
Paul McCarthy committed
198

199
200
    if not bool(submit):
        submit = None
201

202
    if submit is not None:
203
204
205
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
206
207
208

        if submit is True:
            submit = dict()
209

210
    if submit is not None and not isinstance(submit, abc.Mapping):
211
212
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
213

214
215
216
    if cmdonly:
        return args

217
    if DRY_RUN:
218
219
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
220

221
222
223
    # submit - delegate to fsl_sub. This will induce a nested
    # call back to this run function, which is a bit confusing,
    # but harmless, as we've popped the "submit" arg above.
224
    if submit is not None:
225
226
        from fsl.wrappers import fsl_sub  # pylint: disable=import-outside-toplevel  # noqa: E501
        return fsl_sub(*args, **submit, **kwargs)[0].strip()
227

228
229
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
230
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


265
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
266
267
268
269
270
271
272
273
274
275
276
277
278
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

279
280
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
281
282
283

    :arg args:      Command to run

284
285
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

286
287
288
289
290
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
291
292
293
294
295
296
    if fslplatform.fslwsl:
        # On Windows this prevents opening of a popup window
        startupinfo = sp.STARTUPINFO()
        startupinfo.dwFlags |= sp.STARTF_USESHOWWINDOW
        kwargs["startupinfo"] = startupinfo

297
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
298
299
    with tempdir.tempdir(changeto=False) as td:

300
301
        # We always direct the command's stdout/
        # stderr to two temporary files
302
303
304
305
306
307
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

308
309
310
311
312
313
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
314
            if tee:
315
316
317
318
319
320
321
322
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

323
324
            # log the command if requested
            if logCmd is not None:
325
                cmd = ' '.join(args) + '\n'
326
327
328
329
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
330
331
332

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
333
334
335
336
337
338
339
340
341
342
343
344

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

345
346
347
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
348

349
    return stdout, stderr, exitcode
350
351


352
def runfsl(*args, **kwargs):
353
354
    """Call a FSL command and return its output.

355
356
      This function searches for the command in the following
      locations (ordered by priority):
357

358
359
360
361
362
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
363
    """
364
    prefixes = []
365
366

    if FSL_PREFIX is not None:
367
368
369
370
371
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
372

373
    if not prefixes:
374
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
375

376
    args = prepareArgs(args)
377
378
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
379
        if fslplatform.fslwsl:
380
381
382
            wslargs = wslcmd(cmdpath, *args)
            if wslargs is not None:
                args = wslargs
383
384
                break
        elif op.isfile(cmdpath):
385
386
            args[0] = cmdpath
            break
387

388
389
390
391
392
393
    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

394
    return run(*args, **kwargs)
395

396

397
def wslcmd(cmdpath, *args):
Paul McCarthy's avatar
Paul McCarthy committed
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
    """Convert a command + arguments into an equivalent set of arguments that
    will run the command under Windows Subsystem for Linux

    :param cmdpath: Fully qualified path to the command. This is essentially
                    a WSL path not a Windows one since FSLDIR is specified
                    as a WSL path, however it may have backslashes as path
                    separators due to previous use of ``os.path.join``

    :param args:    Sequence of command arguments (the first of which is the
                    unqualified command name)

    :return: If ``cmdpath`` exists and is executable in WSL, return a
             sequence of command arguments which when executed will run the
             command in WSL. Windows paths in the argument list will be
             converted to WSL paths. If ``cmdpath`` was not executable in
             WSL, returns None
414
    """
Paul McCarthy's avatar
Paul McCarthy committed
415
416
417
    # Check if command exists in WSL (remembering
    # that the command path may include FSLDIR
    # which is a Windows path)
418
    cmdpath = fslpath.wslpath(cmdpath)
Paul McCarthy's avatar
Paul McCarthy committed
419
420
    _stdout, _stderr, retcode = _realrun(
        False, None, None, None, "wsl", "test", "-x", cmdpath)
421
    if retcode == 0:
Paul McCarthy's avatar
Paul McCarthy committed
422
423
        # Form a new argument list and convert
        # any Windows paths in it into WSL paths
424
        wslargs = [fslpath.wslpath(arg) for arg in args]
425
        wslargs[0] = cmdpath
426
        local_fsldir = fslpath.wslpath(fslplatform.fsldir)
427
        if fslplatform.fsldevdir:
428
            local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
429
430
        else:
            local_fsldevdir = None
Paul McCarthy's avatar
Paul McCarthy committed
431
432
433
434
        # Prepend important environment variables -
        # note that it seems we cannot use WSLENV
        # for this due to its insistance on path
        # mapping. FIXME FSLDEVDIR?
435
436
437
438
439
        local_path = "$PATH"
        if local_fsldevdir:
            local_path += ":%s/bin" % local_fsldevdir
        local_path += ":%s/bin" % local_fsldir
        prepargs = [
440
            "wsl",
441
442
            "PATH=%s" % local_path,
            "FSLDIR=%s" % local_fsldir,
443
            "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
444
445
446
447
        ]
        if local_fsldevdir:
            prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
        return prepargs + wslargs
448
449
450
    else:
        # Command was not found in WSL with this path
        return None
451
452
453


def hold(job_ids, hold_filename=None):
454
455
456
457
    """Waits until all jobs have finished

    :param job_ids: possibly nested sequence of job ids. The job ids
                    themselves should be strings.
458

459
460
461
462
463
464
    :param hold_filename: filename to use as a hold file.  The
                          containing directory should exist, but the
                          file itself should not.  Defaults to a
                          ./.<random characters>.hold in the current
                          directory.  :return: only returns when all
                          the jobs have finished
465
    """
466
    import fsl.utils.fslsub as fslsub  # pylint: disable=import-outside-toplevel  # noqa: E501
467
    fslsub.hold(job_ids, hold_filename)