From 8bd3ea7b9e543e03135b099dd6645b5883fa6ee5 Mon Sep 17 00:00:00 2001
From: Paul McCarthy <pauldmccarthy@gmail.com>
Date: Sun, 27 May 2018 16:44:47 +0100
Subject: [PATCH] ENH: fsl.utils.run.run now has a "tee" option, which captures
 process stdout/stderr, and also redirects it to real stdout/stderr.

---
 fsl/utils/run.py | 95 +++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 77 insertions(+), 18 deletions(-)

diff --git a/fsl/utils/run.py b/fsl/utils/run.py
index 032e07d97..9e916d810 100644
--- a/fsl/utils/run.py
+++ b/fsl/utils/run.py
@@ -16,7 +16,9 @@
 """
 
 
+import               sys
 import               logging
+import               threading
 import               contextlib
 import               collections
 import subprocess as sp
@@ -26,6 +28,7 @@ import               six
 
 from   fsl.utils.platform import platform as fslplatform
 import fsl.utils.fslsub                   as fslsub
+import fsl.utils.tempdir                  as tempdir
 
 
 log = logging.getLogger(__name__)
@@ -100,6 +103,12 @@ def run(*args, **kwargs):
                  ``False``. If ``True``, standard error is captured and
                  returned. Ignored if ``submit`` is specified.
 
+    :arg tee:    Must be passed as a keyword argument. Defaults to ``False``.
+                 If ``True``, the command's standard output and error streams
+                 are forward to the streams for this process, in addition to
+                 being captured and returned. Ignored if ``submit`` is
+                 specified.
+
     :arg ret:    Must be passed as a keyword argument. Defaults to ``False``.
                  If ``True``, and the command's return code is non-0, an
                  exception is not raised.  Ignored if ``submit`` is specified.
@@ -112,8 +121,26 @@ def run(*args, **kwargs):
                  ``err``), and return code (if ``ret``).
     """
 
+    # Creates a thread which forwards the given
+    # input stream to one or more output streams.
+    # Used when tee is True - we have to read
+    # the process stdout/err on separate threads
+    # to avoid deadlocks.
+    def forward(in_, *outs):
+        def realForward():
+            for line in in_:
+                for o in outs:
+                    if 'b' in o.mode: o.write(line)
+                    else:             o.write(line.decode('utf-8'))
+
+        t = threading.Thread(target=realForward)
+        t.daemon = True
+        t.start()
+        return t
+
     err    = kwargs.get('err',    False)
     ret    = kwargs.get('ret',    False)
+    tee    = kwargs.get('tee',    False)
     submit = kwargs.get('submit', None)
     args   = _prepareArgs(args)
 
@@ -123,6 +150,7 @@ def run(*args, **kwargs):
     if submit is not None:
         err = False
         ret = False
+        tee = False
 
         if submit is True:
             submit = dict()
@@ -131,11 +159,10 @@ def run(*args, **kwargs):
         raise ValueError('submit must be a mapping containing '
                          'options for fsl.utils.fslsub.submit')
 
-    if DRY_RUN:
-        log.debug('dryrun: {}'.format(' '.join(args)))
-    else:
-        log.debug('run: {}'.format(' '.join(args)))
+    if DRY_RUN: log.debug('dryrun: {}'.format(' '.join(args)))
+    else:       log.debug('run: {}'   .format(' '.join(args)))
 
+    # dry run - just echo back the command
     if DRY_RUN:
         stderr = ''
         if submit is None:
@@ -143,23 +170,55 @@ def run(*args, **kwargs):
         else:
             stdout = '[submit] ' + ' '.join(args)
 
-    elif submit is not None:
-        return fslsub.submit(' '.join(args), **submit)
+        results = [stdout]
+        if err: results.append(stderr)
+        if ret: results.append(0)
 
-    else:
-        proc           = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
-        stdout, stderr = proc.communicate()
-        retcode        = proc.returncode
+        if len(results) == 1: return results[0]
+        else:                 return tuple(results)
 
-        stdout = stdout.decode('utf-8').strip()
-        stderr = stderr.decode('utf-8').strip()
-
-        log.debug('stdout: {}'.format(stdout))
-        log.debug('stderr: {}'.format(stderr))
+    # submit - delegate to fslsub
+    if submit is not None:
+        return fslsub.submit(' '.join(args), **submit)
 
-        if not ret and (retcode != 0):
-            raise RuntimeError('{} returned non-zero exit code: {}'.format(
-                args[0], retcode))
+    # Start the command, directing its
+    # stdout/stderr to temporary files
+    # and, if tee is True, to sys.stdout
+    # stderr.
+    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
+    with tempdir.tempdir(changeto=False) as td:
+
+        stdoutf = op.join(td, 'stdout')
+        stderrf = op.join(td, 'stderr')
+
+        with open(stdoutf, 'wb') as stdout, \
+             open(stderrf, 'wb') as stderr:  # noqa
+
+            if tee:
+                stdoutt = forward(proc.stdout, stdout, sys.stdout)
+                stderrt = forward(proc.stderr, stderr, sys.stderr)
+            else:
+                stdoutt = forward(proc.stdout, stdout)
+                stderrt = forward(proc.stderr, stderr)
+
+            # Wait until the forwarding threads
+            # have finished cleanly, and the
+            # command has terminated.
+            stdoutt.join()
+            stderrt.join()
+            proc.communicate()
+
+        # Read in the command's stdout/stderr
+        with open(stdoutf, 'rb') as f: stdout = f.read()
+        with open(stderrf, 'rb') as f: stderr = f.read()
+
+    retcode = proc.returncode
+    stdout  = stdout.decode('utf-8')
+    stderr  = stderr.decode('utf-8')
+
+    if not ret and (retcode != 0):
+        raise RuntimeError('{} returned non-zero exit code: {}'.format(
+            args[0], retcode))
 
     results = [stdout]
 
-- 
GitLab