run.py 11 KB
Newer Older
1
2
3
4
5
6
7
8
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
9
10
11
.. note:: The functions in this module are only known to work in Unix-like
          environments.

12
13
14
15
16
.. autosummary::
   :nosignatures:

   run
   runfsl
17
18
   wait
   dryrun
19
20
21
"""


22
import               sys
23
import               logging
24
import               warnings
25
import               threading
26
import               contextlib
27
import               collections
28
29
30
import subprocess as sp
import os.path    as op

31
32
import               six

33
34
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
35
import fsl.utils.tempdir                  as tempdir
36
37
38
39
40


log = logging.getLogger(__name__)


41
42
43
44
45
46
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


47
48
49
50
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


51
52
53
54
55
56
57
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


58
59
60
61
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
62
63

    The returned standard output will be equal to ``' '.join(args)``.
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


def _prepareArgs(args):
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
79
80
81
82
    """

    if len(args) == 1:

83
84
85
        # Argument was a command string
        if isinstance(args[0], six.string_types):
            args = args[0].split()
86

87
88
89
        # Argument was an unpacked sequence
        else:
            args = args[0]
90

91
92
93
    return list(args)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
113
        for line in iter(in_.readline, b''):
114
115
116
117
118
119
120
121
122
123
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


124
def run(*args, **kwargs):
125
126
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
127

128
129
130
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

131
    An exception is raised if the command returns a non-zero exit code, unless
132
133
    the ``ret`` option is set to ``True``.

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

                     - cmd:    If ``True``, the command itself is logged to the
                               standard output stream(s).

    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
172
    """
173

174
175
176
177
178
179
180
181
182
183
    returnStdout   = kwargs.get('stdout',   True)
    returnStderr   = kwargs.get('stderr',   False)
    returnExitcode = kwargs.get('exitcode', False)
    submit         = kwargs.get('submit',   {})
    log            = kwargs.get('log',      {})
    tee            = log   .get('tee',      False)
    logStdout      = log   .get('stdout',   None)
    logStderr      = log   .get('stderr',   None)
    logCmd         = log   .get('cmd',      False)
    args           = _prepareArgs(args)
184

185
186
    if not bool(submit):
        submit = None
187

188
    if submit is not None:
189
190
191
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
192
193
194

        if submit is True:
            submit = dict()
195

196
197
198
    if submit is not None and not isinstance(submit, collections.Mapping):
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
199
200

    if DRY_RUN:
201
202
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
203

204
205
206
    # submit - delegate to fslsub
    if submit is not None:
        return fslsub.submit(' '.join(args), **submit)
207

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
        tee, logStdout, logStderr, logCmd, *args)

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)



def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _realrun(tee, logStdout, logStderr, logCmd, *args):
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

    :arg logCmd:    If ``True``, the command itself is logged to the standard
                    output stream(s).

    :arg args:      Command to run

    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
270
271
272
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
    with tempdir.tempdir(changeto=False) as td:

273
274
        # We always direct the command's stdout/
        # stderr to two temporary files
275
276
277
278
279
280
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

281
282
283
284
285
286
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
287
            if tee:
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

            # log the command to
            # stdout if requested
            if logCmd:
                cmd = ' '.join(args) + '\n'
                for o in outstreams:
                    if 'b' in getattr(o, 'mode', 'w'):
                        o.write(cmd.encode('utf-8'))
                    else:
                        o.write(cmd)

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
308
309
310
311
312
313
314
315
316
317
318
319

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

320
321
322
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
323

324
    return stdout, stderr, exitcode
325
326


327
def runfsl(*args, **kwargs):
328
    """Call a FSL command and return its output. This function simply prepends
329
    ``$FSLDIR/bin/`` to the command before passing it to :func:`run`.
330
331
    """

332
333
334
335
336
337
338
339
340
    prefix = None

    if FSL_PREFIX is not None:
        prefix = FSL_PREFIX
    elif fslplatform.fsldevdir is not None:
        prefix = op.join(fslplatform.fsldevdir, 'bin')
    elif fslplatform.fsldir is not None:
        prefix = op.join(fslplatform.fsldir, 'bin')
    else:
341
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
342

343
    args    = _prepareArgs(args)
344
    args[0] = op.join(prefix, args[0])
345

346
    return run(*args, **kwargs)
347
348


349
def wait(job_ids):
350
    """Proxy for :func:`.fslsub.wait`. """
351
    return fslsub.wait(job_ids)