Commit 8bd3ea7b authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

ENH: fsl.utils.run.run now has a "tee" option, which captures process

stdout/stderr, and also redirects it to real stdout/stderr.
parent e20eaf04
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
""" """
import sys
import logging import logging
import threading
import contextlib import contextlib
import collections import collections
import subprocess as sp import subprocess as sp
...@@ -26,6 +28,7 @@ import six ...@@ -26,6 +28,7 @@ import six
from fsl.utils.platform import platform as fslplatform from fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub as fslsub import fsl.utils.fslsub as fslsub
import fsl.utils.tempdir as tempdir
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -100,6 +103,12 @@ def run(*args, **kwargs): ...@@ -100,6 +103,12 @@ def run(*args, **kwargs):
``False``. If ``True``, standard error is captured and ``False``. If ``True``, standard error is captured and
returned. Ignored if ``submit`` is specified. returned. Ignored if ``submit`` is specified.
:arg tee: Must be passed as a keyword argument. Defaults to ``False``.
If ``True``, the command's standard output and error streams
are forward to the streams for this process, in addition to
being captured and returned. Ignored if ``submit`` is
specified.
:arg ret: Must be passed as a keyword argument. Defaults to ``False``. :arg ret: Must be passed as a keyword argument. Defaults to ``False``.
If ``True``, and the command's return code is non-0, an If ``True``, and the command's return code is non-0, an
exception is not raised. Ignored if ``submit`` is specified. exception is not raised. Ignored if ``submit`` is specified.
...@@ -112,8 +121,26 @@ def run(*args, **kwargs): ...@@ -112,8 +121,26 @@ def run(*args, **kwargs):
``err``), and return code (if ``ret``). ``err``), and return code (if ``ret``).
""" """
# Creates a thread which forwards the given
# input stream to one or more output streams.
# Used when tee is True - we have to read
# the process stdout/err on separate threads
# to avoid deadlocks.
def forward(in_, *outs):
def realForward():
for line in in_:
for o in outs:
if 'b' in o.mode: o.write(line)
else: o.write(line.decode('utf-8'))
t = threading.Thread(target=realForward)
t.daemon = True
t.start()
return t
err = kwargs.get('err', False) err = kwargs.get('err', False)
ret = kwargs.get('ret', False) ret = kwargs.get('ret', False)
tee = kwargs.get('tee', False)
submit = kwargs.get('submit', None) submit = kwargs.get('submit', None)
args = _prepareArgs(args) args = _prepareArgs(args)
...@@ -123,6 +150,7 @@ def run(*args, **kwargs): ...@@ -123,6 +150,7 @@ def run(*args, **kwargs):
if submit is not None: if submit is not None:
err = False err = False
ret = False ret = False
tee = False
if submit is True: if submit is True:
submit = dict() submit = dict()
...@@ -131,11 +159,10 @@ def run(*args, **kwargs): ...@@ -131,11 +159,10 @@ def run(*args, **kwargs):
raise ValueError('submit must be a mapping containing ' raise ValueError('submit must be a mapping containing '
'options for fsl.utils.fslsub.submit') 'options for fsl.utils.fslsub.submit')
if DRY_RUN: if DRY_RUN: log.debug('dryrun: {}'.format(' '.join(args)))
log.debug('dryrun: {}'.format(' '.join(args))) else: log.debug('run: {}' .format(' '.join(args)))
else:
log.debug('run: {}'.format(' '.join(args)))
# dry run - just echo back the command
if DRY_RUN: if DRY_RUN:
stderr = '' stderr = ''
if submit is None: if submit is None:
...@@ -143,23 +170,55 @@ def run(*args, **kwargs): ...@@ -143,23 +170,55 @@ def run(*args, **kwargs):
else: else:
stdout = '[submit] ' + ' '.join(args) stdout = '[submit] ' + ' '.join(args)
elif submit is not None: results = [stdout]
return fslsub.submit(' '.join(args), **submit) if err: results.append(stderr)
if ret: results.append(0)
else: if len(results) == 1: return results[0]
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE) else: return tuple(results)
stdout, stderr = proc.communicate()
retcode = proc.returncode
stdout = stdout.decode('utf-8').strip() # submit - delegate to fslsub
stderr = stderr.decode('utf-8').strip() if submit is not None:
return fslsub.submit(' '.join(args), **submit)
log.debug('stdout: {}'.format(stdout))
log.debug('stderr: {}'.format(stderr))
if not ret and (retcode != 0): # Start the command, directing its
raise RuntimeError('{} returned non-zero exit code: {}'.format( # stdout/stderr to temporary files
args[0], retcode)) # and, if tee is True, to sys.stdout
# stderr.
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
with tempdir.tempdir(changeto=False) as td:
stdoutf = op.join(td, 'stdout')
stderrf = op.join(td, 'stderr')
with open(stdoutf, 'wb') as stdout, \
open(stderrf, 'wb') as stderr: # noqa
if tee:
stdoutt = forward(proc.stdout, stdout, sys.stdout)
stderrt = forward(proc.stderr, stderr, sys.stderr)
else:
stdoutt = forward(proc.stdout, stdout)
stderrt = forward(proc.stderr, stderr)
# Wait until the forwarding threads
# have finished cleanly, and the
# command has terminated.
stdoutt.join()
stderrt.join()
proc.communicate()
# Read in the command's stdout/stderr
with open(stdoutf, 'rb') as f: stdout = f.read()
with open(stderrf, 'rb') as f: stderr = f.read()
retcode = proc.returncode
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
if not ret and (retcode != 0):
raise RuntimeError('{} returned non-zero exit code: {}'.format(
args[0], retcode))
results = [stdout] results = [stdout]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment