From a34afa7cb7005763efedf60e6691cb99532d4240 Mon Sep 17 00:00:00 2001 From: Paul McCarthy <pauldmccarthy@gmail.com> Date: Mon, 28 May 2018 16:57:25 +0100 Subject: [PATCH] RF,ENH: run now accepts a "log" argument, which allows control over where stdout/stderr streams go. --- fsl/utils/run.py | 288 +++++++++++++++++++++++++++++++---------------- 1 file changed, 193 insertions(+), 95 deletions(-) diff --git a/fsl/utils/run.py b/fsl/utils/run.py index ee4de6621..a8adb80b8 100644 --- a/fsl/utils/run.py +++ b/fsl/utils/run.py @@ -18,6 +18,7 @@ import sys import logging +import warnings import threading import contextlib import collections @@ -87,6 +88,36 @@ def _prepareArgs(args): return list(args) +real_stdout = sys.stdout +def _forwardStream(in_, *outs): + """Creates and starts a daemon thread which forwards the given input stream + to one or more output streams. Used by the :func:`run` function to redirect + a command's standard output/error streams to more than one destination. + + It is necessary to read the process stdout/ stderr on separate threads to + avoid deadlocks. + + :arg in_: Input stream + :arg outs: Output stream(s) + :returns: The thread that has been started. + """ + + # not all file-likes have a mode attribute - + # if not present, assume a string stream + omodes = [getattr(o, 'mode', 'w') for o in outs] + + def realForward(): + for line in in_: + for i, o in enumerate(outs): + if 'b' in omodes[i]: o.write(line) + else: o.write(line.decode('utf-8')) + + t = threading.Thread(target=realForward) + t.daemon = True + t.start() + return t + + def run(*args, **kwargs): """Call a command and return its output. You can pass the command and arguments as a single string, or as a regular or unpacked sequence. @@ -97,69 +128,79 @@ def run(*args, **kwargs): An exception is raised if the command returns a non-zero exit code, unless the ``ret`` option is set to ``True``. - :arg submit: Must be passed as a keyword argument. Defaults to ``None``. - Accepted values are ``True`` or a - If ``True``, the command is submitted as a cluster job via - the :func:`.fslsub.submit` function. May also be a - dictionary containing arguments to that function. - - :arg err: Must be passed as a keyword argument. Defaults to - ``False``. If ``True``, standard error is captured and - returned. Ignored if ``submit`` is specified. - - :arg tee: Must be passed as a keyword argument. Defaults to ``False``. - If ``True``, the command's standard output and error streams - are forward to the streams for this process, in addition to - being captured and returned. Ignored if ``submit`` is - specified. - - :arg ret: Must be passed as a keyword argument. Defaults to ``False``. - If ``True``, and the command's return code is non-0, an - exception is not raised. Ignored if ``submit`` is specified. - - :returns: If ``submit`` is provided, the cluster job ID is returned. - Otherwise if ``err is False and ret is False`` (the default) - a string containing the command's standard output. is - returned. Or, if ``err is True`` and/or ``ret is True``, a - tuple containing the standard output, standard error (if - ``err``), and return code (if ``ret``). + :arg stdout: Must be passed as a keyword argument. Defaults to ``True``. + If ``True``, standard output is captured and returned. + Ignored if ``submit`` is specified. + + :arg stderr: Must be passed as a keyword argument. Defaults to ``False``. + If ``True``, standard error is captured and returned. + Ignored if ``submit`` is specified. + + :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``. + If ``True``, and the command's return code is non-0, an + exception is not raised. Ignored if ``submit`` is + specified. + + :arg err: Deprecated - use ``stderr`` instead. + + :arg ret: Deprecated - use ``exitcode`` instead. + + :arg submit: Must be passed as a keyword argument. Defaults to ``None``. + If ``True``, the command is submitted as a cluster job via + the :func:`.fslsub.submit` function. May also be a + dictionary containing arguments to that function. + + :arg log: Must be passed as a keyword argument. An optional ``dict`` + which may be used to redirect the command's standard output + and error. The following keys are recognised: + + - tee: If ``True``, the command's standard output/error + streams are forwarded to this processes streams. + + - stdout: Optional file-like object to which the command's + standard output stream can be forwarded. + + - stderr: Optional file-like object to which the command's + standard error stream can be forwarded. + + - cmd: If ``True``, the command itself is logged to the + standard output stream(s). + + :returns: If ``submit`` is provided, the return value of + :func:`.fslsub` is returned. Otherwise returns a single + value or a tuple, based on the based on the ``stdout``, + ``stderr``, and ``exitcode`` arguments. """ - # Creates a thread which forwards the given - # input stream to one or more output streams. - # Used when tee is True - we have to read - # the process stdout/err on separate threads - # to avoid deadlocks. - def forward(in_, *outs): - - # not all file-likes have a mode attribute - - # if not present, assume a string stream - omodes = [getattr(o, 'mode', 'w') for o in outs] - - def realForward(): - for line in in_: - for i, o in enumerate(outs): - if 'b' in omodes[i]: o.write(line) - else: o.write(line.decode('utf-8')) - - t = threading.Thread(target=realForward) - t.daemon = True - t.start() - return t - - err = kwargs.get('err', False) - ret = kwargs.get('ret', False) - tee = kwargs.get('tee', False) - submit = kwargs.get('submit', None) - args = _prepareArgs(args) + if 'err' in kwargs: + warnings.warn('err is deprecated and will be removed ' + 'in fslpy 2.0.0 - use stderr instead', + DeprecationWarning) + kwargs['stderr'] = kwargs.get('stderr', kwargs['err']) + if 'ret' in kwargs: + warnings.warn('ret is deprecated and will be removed ' + 'in fslpy 2.0.0 - use exitcode instead', + DeprecationWarning) + kwargs['exitcode'] = kwargs.get('exitcode', kwargs['ret']) + + returnStdout = kwargs.get('stdout', True) + returnStderr = kwargs.get('stderr', False) + returnExitcode = kwargs.get('exitcode', False) + submit = kwargs.get('submit', {}) + log = kwargs.get('log', {}) + tee = log .get('tee', False) + logStdout = log .get('stdout', None) + logStderr = log .get('stderr', None) + logCmd = log .get('cmd', False) + args = _prepareArgs(args) if not bool(submit): submit = None if submit is not None: - err = False - ret = False - tee = False + returnStdout = False + returnStderr = False + returnExitcode = False if submit is True: submit = dict() @@ -168,47 +209,114 @@ def run(*args, **kwargs): raise ValueError('submit must be a mapping containing ' 'options for fsl.utils.fslsub.submit') - if DRY_RUN: log.debug('dryrun: {}'.format(' '.join(args))) - else: log.debug('run: {}' .format(' '.join(args))) - - # dry run - just echo back the command if DRY_RUN: - stderr = '' - if submit is None: - stdout = ' '.join(args) - else: - stdout = '[submit] ' + ' '.join(args) - - results = [stdout] - if err: results.append(stderr) - if ret: results.append(0) - - if len(results) == 1: return results[0] - else: return tuple(results) + return _dryrun( + submit, returnStdout, returnStderr, returnExitcode, *args) # submit - delegate to fslsub if submit is not None: return fslsub.submit(' '.join(args), **submit) - # Start the command, directing its - # stdout/stderr to temporary files - # and, if tee is True, to sys.stdout - # stderr. + # Run directly - delegate to _realrun + stdout, stderr, exitcode = _realrun( + tee, logStdout, logStderr, logCmd, *args) + + if not returnExitcode and (exitcode != 0): + raise RuntimeError('{} returned non-zero exit code: {}'.format( + args[0], exitcode)) + + results = [] + if returnStdout: results.append(stdout) + if returnStderr: results.append(stderr) + if returnExitcode: results.append(exitcode) + + if len(results) == 1: return results[0] + else: return tuple(results) + + + +def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args): + """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is + active. + """ + + if submit: + return ('0',) + + results = [] + stderr = '' + stdout = ' '.join(args) + + if returnStdout: results.append(stdout) + if returnStderr: results.append(stderr) + if returnExitcode: results.append(0) + + if len(results) == 1: return results[0] + else: return tuple(results) + + +def _realrun(tee, logStdout, logStderr, logCmd, *args): + """Used by :func:`run`. Runs the given command and manages its standard + output and error streams. + + :arg tee: If ``True``, the command's standard output and error + streams are forwarded to this process' standard output/ + error. + + :arg logStdout: Optional file-like object to which the command's standard + output stream can be forwarded. + + :arg logStderr: Optional file-like object to which the command's standard + error stream can be forwarded. + + :arg logCmd: If ``True``, the command itself is logged to the standard + output stream(s). + + :arg args: Command to run + + :returns: A tuple containing: + - the command's standard output as a string. + - the command's standard error as a string. + - the command's exit code. + """ proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE) with tempdir.tempdir(changeto=False) as td: + # We always direct the command's stdout/ + # stderr to two temporary files stdoutf = op.join(td, 'stdout') stderrf = op.join(td, 'stderr') with open(stdoutf, 'wb') as stdout, \ open(stderrf, 'wb') as stderr: # noqa + outstreams = [stdout] + errstreams = [stderr] + + # if tee, we duplicate the command's + # stdout/stderr to this process' + # stdout/stderr if tee: - stdoutt = forward(proc.stdout, stdout, sys.stdout) - stderrt = forward(proc.stderr, stderr, sys.stderr) - else: - stdoutt = forward(proc.stdout, stdout) - stderrt = forward(proc.stderr, stderr) + outstreams.append(sys.stdout) + errstreams.append(sys.stderr) + + # And we also duplicate to caller- + # provided streams if they're given. + if logStdout is not None: outstreams.append(logStdout) + if logStderr is not None: errstreams.append(logStderr) + + # log the command to + # stdout if requested + if logCmd: + cmd = ' '.join(args) + '\n' + for o in outstreams: + if 'b' in getattr(o, 'mode', 'w'): + o.write(cmd.encode('utf-8')) + else: + o.write(cmd) + + stdoutt = _forwardStream(proc.stdout, *outstreams) + stderrt = _forwardStream(proc.stderr, *errstreams) # Wait until the forwarding threads # have finished cleanly, and the @@ -221,21 +329,11 @@ def run(*args, **kwargs): with open(stdoutf, 'rb') as f: stdout = f.read() with open(stderrf, 'rb') as f: stderr = f.read() - retcode = proc.returncode - stdout = stdout.decode('utf-8') - stderr = stderr.decode('utf-8') + exitcode = proc.returncode + stdout = stdout.decode('utf-8') + stderr = stderr.decode('utf-8') - if not ret and (retcode != 0): - raise RuntimeError('{} returned non-zero exit code: {}'.format( - args[0], retcode)) - - results = [stdout] - - if err: results.append(stderr) - if ret: results.append(retcode) - - if len(results) == 1: return results[0] - else: return tuple(results) + return stdout, stderr, exitcode def runfsl(*args, **kwargs): -- GitLab