run.py 11.2 KB
Newer Older
1
2
3
4
5
6
7
8
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
9
10
11
.. note:: The functions in this module are only known to work in Unix-like
          environments.

12
13
14
15
16
.. autosummary::
   :nosignatures:

   run
   runfsl
17
18
   wait
   dryrun
19
20
21
"""


22
import               sys
23
import               shlex
24
import               logging
25
import               threading
26
import               contextlib
27
import               collections
28
29
30
import subprocess as sp
import os.path    as op

31
32
import               six

33
34
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
35
import fsl.utils.tempdir                  as tempdir
36
37
38
39
40


log = logging.getLogger(__name__)


41
42
43
44
45
46
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


47
48
49
50
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


51
52
53
54
55
56
57
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


58
59
60
61
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
62
63

    The returned standard output will be equal to ``' '.join(args)``.
64
65
66
67
68
69
70
71
72
73
74
75
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


76
def prepareArgs(args):
77
78
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
79
80
81
82
    """

    if len(args) == 1:

83
84
        # Argument was a command string
        if isinstance(args[0], six.string_types):
85
            args = shlex.split(args[0])
86

87
88
89
        # Argument was an unpacked sequence
        else:
            args = args[0]
90

91
92
93
    return list(args)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
113
        for line in iter(in_.readline, b''):
114
115
116
117
118
119
120
121
122
123
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


124
def run(*args, **kwargs):
125
126
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
127

128
129
130
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

131
    An exception is raised if the command returns a non-zero exit code, unless
132
    the ``exitcode`` option is set to ``True``.
133

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

165
166
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
167
168
169
170
171

    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
172
    """
173

174
175
176
177
178
179
180
181
    returnStdout   = kwargs.get('stdout',   True)
    returnStderr   = kwargs.get('stderr',   False)
    returnExitcode = kwargs.get('exitcode', False)
    submit         = kwargs.get('submit',   {})
    log            = kwargs.get('log',      {})
    tee            = log   .get('tee',      False)
    logStdout      = log   .get('stdout',   None)
    logStderr      = log   .get('stderr',   None)
182
    logCmd         = log   .get('cmd',      None)
183
    args           = prepareArgs(args)
184

185
186
    if not bool(submit):
        submit = None
187

188
    if submit is not None:
189
190
191
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
192
193
194

        if submit is True:
            submit = dict()
195

196
197
198
    if submit is not None and not isinstance(submit, collections.Mapping):
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
199
200

    if DRY_RUN:
201
202
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
203

204
205
206
    # submit - delegate to fslsub
    if submit is not None:
        return fslsub.submit(' '.join(args), **submit)
207

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
        tee, logStdout, logStderr, logCmd, *args)

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)



def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _realrun(tee, logStdout, logStderr, logCmd, *args):
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

260
261
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
262
263
264
265
266
267
268
269

    :arg args:      Command to run

    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
270
271
272
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
    with tempdir.tempdir(changeto=False) as td:

273
274
        # We always direct the command's stdout/
        # stderr to two temporary files
275
276
277
278
279
280
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

281
282
283
284
285
286
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
287
            if tee:
288
289
290
291
292
293
294
295
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

296
297
            # log the command if requested
            if logCmd is not None:
298
                cmd = ' '.join(args) + '\n'
299
300
301
302
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
303
304
305

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
306
307
308
309
310
311
312
313
314
315
316
317

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

318
319
320
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
321

322
    return stdout, stderr, exitcode
323
324


325
def runfsl(*args, **kwargs):
326
327
    """Call a FSL command and return its output.

328
329
      This function searches for the command in the following
      locations (ordered by priority):
330

331
332
333
334
335
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
336
    """
337
    prefixes = []
338
339

    if FSL_PREFIX is not None:
340
341
342
343
344
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
345

346
    if not prefixes:
347
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
348

349
    args = prepareArgs(args)
350
351
352
353
354
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
        if op.isfile(cmdpath):
            args[0] = cmdpath
            break
355

356
    return run(*args, **kwargs)
357
358


359
def wait(job_ids):
360
    """Proxy for :func:`.fslsub.wait`. """
361
    return fslsub.wait(job_ids)