diff --git a/fsl/utils/run.py b/fsl/utils/run.py index 032e07d97a91951e61026b5e49f757675b799bd9..9e916d8102ec2979b18777db89ffcab9e46b8306 100644 --- a/fsl/utils/run.py +++ b/fsl/utils/run.py @@ -16,7 +16,9 @@ """ +import sys import logging +import threading import contextlib import collections import subprocess as sp @@ -26,6 +28,7 @@ import six from fsl.utils.platform import platform as fslplatform import fsl.utils.fslsub as fslsub +import fsl.utils.tempdir as tempdir log = logging.getLogger(__name__) @@ -100,6 +103,12 @@ def run(*args, **kwargs): ``False``. If ``True``, standard error is captured and returned. Ignored if ``submit`` is specified. + :arg tee: Must be passed as a keyword argument. Defaults to ``False``. + If ``True``, the command's standard output and error streams + are forward to the streams for this process, in addition to + being captured and returned. Ignored if ``submit`` is + specified. + :arg ret: Must be passed as a keyword argument. Defaults to ``False``. If ``True``, and the command's return code is non-0, an exception is not raised. Ignored if ``submit`` is specified. @@ -112,8 +121,26 @@ def run(*args, **kwargs): ``err``), and return code (if ``ret``). """ + # Creates a thread which forwards the given + # input stream to one or more output streams. + # Used when tee is True - we have to read + # the process stdout/err on separate threads + # to avoid deadlocks. + def forward(in_, *outs): + def realForward(): + for line in in_: + for o in outs: + if 'b' in o.mode: o.write(line) + else: o.write(line.decode('utf-8')) + + t = threading.Thread(target=realForward) + t.daemon = True + t.start() + return t + err = kwargs.get('err', False) ret = kwargs.get('ret', False) + tee = kwargs.get('tee', False) submit = kwargs.get('submit', None) args = _prepareArgs(args) @@ -123,6 +150,7 @@ def run(*args, **kwargs): if submit is not None: err = False ret = False + tee = False if submit is True: submit = dict() @@ -131,11 +159,10 @@ def run(*args, **kwargs): raise ValueError('submit must be a mapping containing ' 'options for fsl.utils.fslsub.submit') - if DRY_RUN: - log.debug('dryrun: {}'.format(' '.join(args))) - else: - log.debug('run: {}'.format(' '.join(args))) + if DRY_RUN: log.debug('dryrun: {}'.format(' '.join(args))) + else: log.debug('run: {}' .format(' '.join(args))) + # dry run - just echo back the command if DRY_RUN: stderr = '' if submit is None: @@ -143,23 +170,55 @@ def run(*args, **kwargs): else: stdout = '[submit] ' + ' '.join(args) - elif submit is not None: - return fslsub.submit(' '.join(args), **submit) + results = [stdout] + if err: results.append(stderr) + if ret: results.append(0) - else: - proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE) - stdout, stderr = proc.communicate() - retcode = proc.returncode + if len(results) == 1: return results[0] + else: return tuple(results) - stdout = stdout.decode('utf-8').strip() - stderr = stderr.decode('utf-8').strip() - - log.debug('stdout: {}'.format(stdout)) - log.debug('stderr: {}'.format(stderr)) + # submit - delegate to fslsub + if submit is not None: + return fslsub.submit(' '.join(args), **submit) - if not ret and (retcode != 0): - raise RuntimeError('{} returned non-zero exit code: {}'.format( - args[0], retcode)) + # Start the command, directing its + # stdout/stderr to temporary files + # and, if tee is True, to sys.stdout + # stderr. + proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE) + with tempdir.tempdir(changeto=False) as td: + + stdoutf = op.join(td, 'stdout') + stderrf = op.join(td, 'stderr') + + with open(stdoutf, 'wb') as stdout, \ + open(stderrf, 'wb') as stderr: # noqa + + if tee: + stdoutt = forward(proc.stdout, stdout, sys.stdout) + stderrt = forward(proc.stderr, stderr, sys.stderr) + else: + stdoutt = forward(proc.stdout, stdout) + stderrt = forward(proc.stderr, stderr) + + # Wait until the forwarding threads + # have finished cleanly, and the + # command has terminated. + stdoutt.join() + stderrt.join() + proc.communicate() + + # Read in the command's stdout/stderr + with open(stdoutf, 'rb') as f: stdout = f.read() + with open(stderrf, 'rb') as f: stderr = f.read() + + retcode = proc.returncode + stdout = stdout.decode('utf-8') + stderr = stderr.decode('utf-8') + + if not ret and (retcode != 0): + raise RuntimeError('{} returned non-zero exit code: {}'.format( + args[0], retcode)) results = [stdout]