run.py 11.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides some functions for running shell commands.

.. autosummary::
   :nosignatures:

   run
   runfsl
14
15
   wait
   dryrun
16
17
18
"""


19
import               sys
20
import               logging
21
import               warnings
22
import               threading
23
import               contextlib
24
import               collections
25
26
27
import subprocess as sp
import os.path    as op

28
29
import               six

30
31
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
32
import fsl.utils.tempdir                  as tempdir
33
34
35
36
37


log = logging.getLogger(__name__)


38
39
40
41
42
43
DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


44
45
46
47
FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


48
49
50
51
52
53
54
class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


55
56
57
58
@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.
59
60

    The returned standard output will be equal to ``' '.join(args)``.
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


def _prepareArgs(args):
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
76
77
78
79
    """

    if len(args) == 1:

80
81
82
        # Argument was a command string
        if isinstance(args[0], six.string_types):
            args = args[0].split()
83

84
85
86
        # Argument was an unpacked sequence
        else:
            args = args[0]
87

88
89
90
    return list(args)


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
110
        for line in iter(in_.readline, b''):
111
112
113
114
115
116
117
118
119
120
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


121
def run(*args, **kwargs):
122
123
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.
124

125
126
127
    The command can be run on a cluster by using the ``submit`` keyword
    argument.

128
    An exception is raised if the command returns a non-zero exit code, unless
129
130
    the ``ret`` option is set to ``True``.

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg err:      Deprecated - use ``stderr`` instead.

    :arg ret:      Deprecated - use ``exitcode`` instead.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

                     - cmd:    If ``True``, the command itself is logged to the
                               standard output stream(s).

    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
173
    """
174

175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
    if 'err' in kwargs:
        warnings.warn('err is deprecated and will be removed '
                      'in fslpy 2.0.0 - use stderr instead',
                      DeprecationWarning)
        kwargs['stderr'] = kwargs.get('stderr', kwargs['err'])
    if 'ret' in kwargs:
        warnings.warn('ret is deprecated and will be removed '
                      'in fslpy 2.0.0 - use exitcode instead',
                      DeprecationWarning)
        kwargs['exitcode'] = kwargs.get('exitcode', kwargs['ret'])

    returnStdout   = kwargs.get('stdout',   True)
    returnStderr   = kwargs.get('stderr',   False)
    returnExitcode = kwargs.get('exitcode', False)
    submit         = kwargs.get('submit',   {})
    log            = kwargs.get('log',      {})
    tee            = log   .get('tee',      False)
    logStdout      = log   .get('stdout',   None)
    logStderr      = log   .get('stderr',   None)
    logCmd         = log   .get('cmd',      False)
    args           = _prepareArgs(args)
196

197
198
    if not bool(submit):
        submit = None
199

200
    if submit is not None:
201
202
203
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False
204
205
206

        if submit is True:
            submit = dict()
207

208
209
210
    if submit is not None and not isinstance(submit, collections.Mapping):
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')
211
212

    if DRY_RUN:
213
214
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)
215

216
217
218
    # submit - delegate to fslsub
    if submit is not None:
        return fslsub.submit(' '.join(args), **submit)
219

220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
        tee, logStdout, logStderr, logCmd, *args)

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)



def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _realrun(tee, logStdout, logStderr, logCmd, *args):
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

    :arg logCmd:    If ``True``, the command itself is logged to the standard
                    output stream(s).

    :arg args:      Command to run

    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
282
283
284
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
    with tempdir.tempdir(changeto=False) as td:

285
286
        # We always direct the command's stdout/
        # stderr to two temporary files
287
288
289
290
291
292
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

293
294
295
296
297
298
            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
299
            if tee:
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

            # log the command to
            # stdout if requested
            if logCmd:
                cmd = ' '.join(args) + '\n'
                for o in outstreams:
                    if 'b' in getattr(o, 'mode', 'w'):
                        o.write(cmd.encode('utf-8'))
                    else:
                        o.write(cmd)

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)
320
321
322
323
324
325
326
327
328
329
330
331

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

332
333
334
    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')
335

336
    return stdout, stderr, exitcode
337
338


339
def runfsl(*args, **kwargs):
340
341
342
343
344
345
346
347
348
349
    """Call a FSL command and return its output. 
    
      This function searches for the command in the following
      locations (ordered by priority):
      
      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
350
    """
351
    prefixes = []
352
353

    if FSL_PREFIX is not None:
354
355
356
357
358
359
360
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))
    
    if not prefixes:
361
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')
362

363
364
365
366
367
368
    args = _prepareArgs(args)
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
        if op.isfile(cmdpath):
            args[0] = cmdpath
            break
369

370
    return run(*args, **kwargs)
371

372
def wait(job_ids):
373
    """Proxy for :func:`.fslsub.wait`. """
374
    return fslsub.wait(job_ids)