run.py 11.4 KB
Newer Older
1
2
3
4
5
6
7
8
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides some functions for running shell commands.

Paul McCarthy's avatar
Paul McCarthy committed
9
10
11
.. note:: The functions in this module are only known to work in Unix-like
          environments.

12
13
14
15
16
.. autosummary::
   :nosignatures:

   run
   runfsl
17
18
   wait
   dryrun
19
20
21
"""


22
import               sys
23
import               shlex
24
import               logging
25
import               threading
26
import               contextlib
27
import               collections
28
29
30
import subprocess as sp
import os.path    as op

31
32
import               six

33
34
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
35
import fsl.utils.tempdir                  as tempdir
36
37
38
39
40


log = logging.getLogger(__name__)


41
42
43
44
45
46
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


47
48
49
50
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


51
52
53
54
55
56
57
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


58
59
60
61
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
62
63

    The returned standard output will be equal to ``' '.join(args)``.
64
65
66
67
68
69
70
71
72
73
74
75
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


76
def prepareArgs(args):
77
78
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
79
80
81
82
    """

    if len(args) == 1:

83
84
        # Argument was a command string
        if isinstance(args[0], six.string_types):
85
            args = shlex.split(args[0])
86

87
88
89
        # Argument was an unpacked sequence
        else:
            args = args[0]
90

91
92
93
    return list(args)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
113
        for line in iter(in_.readline, b''):
114
115
116
117
118
119
120
121
122
123
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


124
def run(*args, **kwargs):
125
126
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
127

128
129
130
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

131
    An exception is raised if the command returns a non-zero exit code, unless
132
    the ``exitcode`` option is set to ``True``.
133

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

165
166
                     - cmd:    Optional file-like object to which the command
                               itself is logged.
167

168
169
170
171
    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
    are ignored.

172
173
174
175
    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
176
    """
177

178
179
180
181
182
    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
    log            = kwargs.pop('log',      {})
183
184
185
    tee            = log   .get('tee',      False)
    logStdout      = log   .get('stdout',   None)
    logStderr      = log   .get('stderr',   None)
186
    logCmd         = log   .get('cmd',      None)
187
    args           = prepareArgs(args)
188

189
190
    if not bool(submit):
        submit = None
191

192
    if submit is not None:
193
194
195
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
196
197
198

        if submit is True:
            submit = dict()
199

200
201
202
    if submit is not None and not isinstance(submit, collections.Mapping):
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
203
204

    if DRY_RUN:
205
206
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
207

208
209
210
    # submit - delegate to fslsub
    if submit is not None:
        return fslsub.submit(' '.join(args), **submit)
211

212
213
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
214
        tee, logStdout, logStderr, logCmd, *args, **kwargs)
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


249
def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
250
251
252
253
254
255
256
257
258
259
260
261
262
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

263
264
    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.
265
266
267

    :arg args:      Command to run

268
269
    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

270
271
272
273
274
    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
275
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
276
277
    with tempdir.tempdir(changeto=False) as td:

278
279
        # We always direct the command's stdout/
        # stderr to two temporary files
280
281
282
283
284
285
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

286
287
288
289
290
291
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
292
            if tee:
293
294
295
296
297
298
299
300
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

301
302
            # log the command if requested
            if logCmd is not None:
303
                cmd = ' '.join(args) + '\n'
304
305
306
307
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)
308
309
310

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
311
312
313
314
315
316
317
318
319
320
321
322

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

323
324
325
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
326

327
    return stdout, stderr, exitcode
328
329


330
def runfsl(*args, **kwargs):
331
332
    """Call a FSL command and return its output.

333
334
      This function searches for the command in the following
      locations (ordered by priority):
335

336
337
338
339
340
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
341
    """
342
    prefixes = []
343
344

    if FSL_PREFIX is not None:
345
346
347
348
349
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
350

351
    if not prefixes:
352
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
353

354
    args = prepareArgs(args)
355
356
357
358
359
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
        if op.isfile(cmdpath):
            args[0] = cmdpath
            break
360

361
    return run(*args, **kwargs)
362
363


364
def wait(job_ids):
365
    """Proxy for :func:`.fslsub.wait`. """
366
    return fslsub.wait(job_ids)