run.py 15.1 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
6
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
7
8
9
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
10
11
12
.. note:: The functions in this module are only known to work in Unix-like
          environments.

13
14
15
16
17
.. autosummary::
   :nosignatures:

   run
   runfsl
18
   dryrun
19
   hold
20
21
22
"""


23
24
25
26
27
28
29
30
31
32
import                    sys
import                    shlex
import                    logging
import                    threading
import                    contextlib
import collections.abc as abc
import subprocess      as sp
import os.path         as op
import                    os

33
34
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
35
import fsl.utils.tempdir                  as tempdir
36
import fsl.utils.path                     as fslpath
37
38
39
40

log = logging.getLogger(__name__)


41
42
43
44
45
46
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


47
48
49
50
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


51
52
53
54
55
56
57
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


58
59
60
61
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
62
63

    The returned standard output will be equal to ``' '.join(args)``.
64
65
66
67
68
69
70
71
72
73
74
75
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


76
def prepareArgs(args):
77
78
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
79
80
81
82
    """

    if len(args) == 1:

83
        # Argument was a command string
Paul McCarthy's avatar
Paul McCarthy committed
84
        if isinstance(args[0], str):
85
            args = shlex.split(args[0])
86

87
88
89
        # Argument was an unpacked sequence
        else:
            args = args[0]
90

91
92
93
    return list(args)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
113
        for line in iter(in_.readline, b''):
114
115
116
117
118
119
120
121
122
123
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


124
def run(*args, **kwargs):
125
126
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
127

128
129
130
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

131
    An exception is raised if the command returns a non-zero exit code, unless
132
    the ``exitcode`` option is set to ``True``.
133

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

152
153
154
155
    :arg cmdonly:  Defaults to ``False``. If ``True``, the command is not
                   executed, but rather is returned directly, as a list of
                   arguments.

156
157
158
159
160
161
162
163
164
165
166
167
168
    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

169
170
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
171

172
173
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
174
    are passed through to the :func:`.fslsub.submit` function.
175

176
177
178
179
    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
180
    """
181

182
183
184
185
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
186
    cmdonly        = kwargs.pop('cmdonly',  False)
Paul McCarthy's avatar
Paul McCarthy committed
187
    log            = kwargs.pop('log',      None)
188
    args           = prepareArgs(args)
189

Paul McCarthy's avatar
Paul McCarthy committed
190
191
192
193
194
195
196
197
    if log is None:
        log = {}

    tee       = log.get('tee',    False)
    logStdout = log.get('stdout', None)
    logStderr = log.get('stderr', None)
    logCmd    = log.get('cmd',    None)

198
199
    if not bool(submit):
        submit = None
200

201
    if submit is not None:
202
203
204
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
205
206
207

        if submit is True:
            submit = dict()
208

209
    if submit is not None and not isinstance(submit, abc.Mapping):
210
211
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
212

213
214
215
    if cmdonly:
        return args

216
    if DRY_RUN:
217
218
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
219

220
221
    # submit - delegate to fslsub
    if submit is not None:
222
        return fslsub.submit(' '.join(args), **submit, **kwargs)
223

224
225
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
226
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


261
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
262
263
264
265
266
267
268
269
270
271
272
273
274
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

275
276
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
277
278
279

    :arg args:      Command to run

280
281
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

282
283
284
285
286
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
287
288
289
290
291
292
    if fslplatform.fslwsl:
        # On Windows this prevents opening of a popup window
        startupinfo = sp.STARTUPINFO()
        startupinfo.dwFlags |= sp.STARTF_USESHOWWINDOW
        kwargs["startupinfo"] = startupinfo

293
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
294
295
    with tempdir.tempdir(changeto=False) as td:

296
297
        # We always direct the command's stdout/
        # stderr to two temporary files
298
299
300
301
302
303
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

304
305
306
307
308
309
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
310
            if tee:
311
312
313
314
315
316
317
318
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

319
320
            # log the command if requested
            if logCmd is not None:
321
                cmd = ' '.join(args) + '\n'
322
323
324
325
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
326
327
328

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
329
330
331
332
333
334
335
336
337
338
339
340

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

341
342
343
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
344

345
    return stdout, stderr, exitcode
346
347


348
def runfsl(*args, **kwargs):
349
350
    """Call a FSL command and return its output.

351
352
      This function searches for the command in the following
      locations (ordered by priority):
353

354
355
356
357
358
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
359
    """
360
    prefixes = []
361
362

    if FSL_PREFIX is not None:
363
364
365
366
367
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
368

369
    if not prefixes:
370
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
371

372
    args = prepareArgs(args)
373
374
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
375
        if fslplatform.fslwsl:
376
377
378
            wslargs = wslcmd(cmdpath, *args)
            if wslargs is not None:
                args = wslargs
379
380
                break
        elif op.isfile(cmdpath):
381
382
            args[0] = cmdpath
            break
383

384
385
386
387
388
389
    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

390
    return run(*args, **kwargs)
391

392

393
394
395
396
def wslcmd(cmdpath, *args):
    """
    Convert a command + arguments into an equivalent set of arguments that will run the command
    under Windows Subsystem for Linux
397

398
399
400
401
    :param cmdpath: Fully qualified path to the command. This is essentially a WSL path not a Windows
                    one since FSLDIR is specified as a WSL path, however it may have backslashes
                    as path separators due to previous use of ``os.path.join``
    :param args: Sequence of command arguments (the first of which is the unqualified command name)
402

403
404
405
406
    :return: If ``cmdpath`` exists and is executable in WSL, return a sequence of command arguments
             which when executed will run the command in WSL. Windows paths in the argument list will
             be converted to WSL paths. If ``cmdpath`` was not executable in WSL, returns None
    """
407
408
    # Check if command exists in WSL (remembering that the command path may include FSLDIR which
    # is a Windows path)
409
    cmdpath = fslpath.wslpath(cmdpath)
410
    _stdout, _stderr, retcode = _realrun(False, None, None, None, "wsl", "test", "-x", cmdpath)
411
412
    if retcode == 0:
        # Form a new argument list and convert any Windows paths in it into WSL paths
413
        wslargs = [fslpath.wslpath(arg) for arg in args]
414
        wslargs[0] = cmdpath
415
        local_fsldir = fslpath.wslpath(fslplatform.fsldir)
416
        if fslplatform.fsldevdir:
417
            local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
418
419
        else:
            local_fsldevdir = None
420
        # Prepend important environment variables - note that it seems we cannot
421
422
423
424
425
426
        # use WSLENV for this due to its insistance on path mapping. FIXME FSLDEVDIR?
        local_path = "$PATH"
        if local_fsldevdir:
            local_path += ":%s/bin" % local_fsldevdir
        local_path += ":%s/bin" % local_fsldir
        prepargs = [
427
            "wsl",
428
429
            "PATH=%s" % local_path,
            "FSLDIR=%s" % local_fsldir,
430
            "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
431
432
433
434
        ]
        if local_fsldevdir:
            prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
        return prepargs + wslargs
435
436
437
    else:
        # Command was not found in WSL with this path
        return None
438
439
440
441
442
443
444
445
446
447
448
449
450


def hold(job_ids, hold_filename=None):
    """
    Waits until all jobs have finished

    :param job_ids: possibly nested sequence of job ids. The job ids themselves should be strings.
    :param hold_filename: filename to use as a hold file.
        The containing directory should exist, but the file itself should not.
        Defaults to a ./.<random characters>.hold in the current directory.
    :return: only returns when all the jobs have finished
    """
    fslsub.hold(job_ids, hold_filename)