Skip to content
Snippets Groups Projects
Commit a34afa7c authored by Paul McCarthy's avatar Paul McCarthy :mountain_bicyclist:
Browse files

RF,ENH: run now accepts a "log" argument, which allows control over where

stdout/stderr streams go.
parent 3738b75f
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
import sys import sys
import logging import logging
import warnings
import threading import threading
import contextlib import contextlib
import collections import collections
...@@ -87,6 +88,36 @@ def _prepareArgs(args): ...@@ -87,6 +88,36 @@ def _prepareArgs(args):
return list(args) return list(args)
real_stdout = sys.stdout
def _forwardStream(in_, *outs):
"""Creates and starts a daemon thread which forwards the given input stream
to one or more output streams. Used by the :func:`run` function to redirect
a command's standard output/error streams to more than one destination.
It is necessary to read the process stdout/ stderr on separate threads to
avoid deadlocks.
:arg in_: Input stream
:arg outs: Output stream(s)
:returns: The thread that has been started.
"""
# not all file-likes have a mode attribute -
# if not present, assume a string stream
omodes = [getattr(o, 'mode', 'w') for o in outs]
def realForward():
for line in in_:
for i, o in enumerate(outs):
if 'b' in omodes[i]: o.write(line)
else: o.write(line.decode('utf-8'))
t = threading.Thread(target=realForward)
t.daemon = True
t.start()
return t
def run(*args, **kwargs): def run(*args, **kwargs):
"""Call a command and return its output. You can pass the command and """Call a command and return its output. You can pass the command and
arguments as a single string, or as a regular or unpacked sequence. arguments as a single string, or as a regular or unpacked sequence.
...@@ -97,69 +128,79 @@ def run(*args, **kwargs): ...@@ -97,69 +128,79 @@ def run(*args, **kwargs):
An exception is raised if the command returns a non-zero exit code, unless An exception is raised if the command returns a non-zero exit code, unless
the ``ret`` option is set to ``True``. the ``ret`` option is set to ``True``.
:arg submit: Must be passed as a keyword argument. Defaults to ``None``. :arg stdout: Must be passed as a keyword argument. Defaults to ``True``.
Accepted values are ``True`` or a If ``True``, standard output is captured and returned.
If ``True``, the command is submitted as a cluster job via Ignored if ``submit`` is specified.
the :func:`.fslsub.submit` function. May also be a
dictionary containing arguments to that function. :arg stderr: Must be passed as a keyword argument. Defaults to ``False``.
If ``True``, standard error is captured and returned.
:arg err: Must be passed as a keyword argument. Defaults to Ignored if ``submit`` is specified.
``False``. If ``True``, standard error is captured and
returned. Ignored if ``submit`` is specified. :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
If ``True``, and the command's return code is non-0, an
:arg tee: Must be passed as a keyword argument. Defaults to ``False``. exception is not raised. Ignored if ``submit`` is
If ``True``, the command's standard output and error streams specified.
are forward to the streams for this process, in addition to
being captured and returned. Ignored if ``submit`` is :arg err: Deprecated - use ``stderr`` instead.
specified.
:arg ret: Deprecated - use ``exitcode`` instead.
:arg ret: Must be passed as a keyword argument. Defaults to ``False``.
If ``True``, and the command's return code is non-0, an :arg submit: Must be passed as a keyword argument. Defaults to ``None``.
exception is not raised. Ignored if ``submit`` is specified. If ``True``, the command is submitted as a cluster job via
the :func:`.fslsub.submit` function. May also be a
:returns: If ``submit`` is provided, the cluster job ID is returned. dictionary containing arguments to that function.
Otherwise if ``err is False and ret is False`` (the default)
a string containing the command's standard output. is :arg log: Must be passed as a keyword argument. An optional ``dict``
returned. Or, if ``err is True`` and/or ``ret is True``, a which may be used to redirect the command's standard output
tuple containing the standard output, standard error (if and error. The following keys are recognised:
``err``), and return code (if ``ret``).
- tee: If ``True``, the command's standard output/error
streams are forwarded to this processes streams.
- stdout: Optional file-like object to which the command's
standard output stream can be forwarded.
- stderr: Optional file-like object to which the command's
standard error stream can be forwarded.
- cmd: If ``True``, the command itself is logged to the
standard output stream(s).
:returns: If ``submit`` is provided, the return value of
:func:`.fslsub` is returned. Otherwise returns a single
value or a tuple, based on the based on the ``stdout``,
``stderr``, and ``exitcode`` arguments.
""" """
# Creates a thread which forwards the given if 'err' in kwargs:
# input stream to one or more output streams. warnings.warn('err is deprecated and will be removed '
# Used when tee is True - we have to read 'in fslpy 2.0.0 - use stderr instead',
# the process stdout/err on separate threads DeprecationWarning)
# to avoid deadlocks. kwargs['stderr'] = kwargs.get('stderr', kwargs['err'])
def forward(in_, *outs): if 'ret' in kwargs:
warnings.warn('ret is deprecated and will be removed '
# not all file-likes have a mode attribute - 'in fslpy 2.0.0 - use exitcode instead',
# if not present, assume a string stream DeprecationWarning)
omodes = [getattr(o, 'mode', 'w') for o in outs] kwargs['exitcode'] = kwargs.get('exitcode', kwargs['ret'])
def realForward(): returnStdout = kwargs.get('stdout', True)
for line in in_: returnStderr = kwargs.get('stderr', False)
for i, o in enumerate(outs): returnExitcode = kwargs.get('exitcode', False)
if 'b' in omodes[i]: o.write(line) submit = kwargs.get('submit', {})
else: o.write(line.decode('utf-8')) log = kwargs.get('log', {})
tee = log .get('tee', False)
t = threading.Thread(target=realForward) logStdout = log .get('stdout', None)
t.daemon = True logStderr = log .get('stderr', None)
t.start() logCmd = log .get('cmd', False)
return t args = _prepareArgs(args)
err = kwargs.get('err', False)
ret = kwargs.get('ret', False)
tee = kwargs.get('tee', False)
submit = kwargs.get('submit', None)
args = _prepareArgs(args)
if not bool(submit): if not bool(submit):
submit = None submit = None
if submit is not None: if submit is not None:
err = False returnStdout = False
ret = False returnStderr = False
tee = False returnExitcode = False
if submit is True: if submit is True:
submit = dict() submit = dict()
...@@ -168,47 +209,114 @@ def run(*args, **kwargs): ...@@ -168,47 +209,114 @@ def run(*args, **kwargs):
raise ValueError('submit must be a mapping containing ' raise ValueError('submit must be a mapping containing '
'options for fsl.utils.fslsub.submit') 'options for fsl.utils.fslsub.submit')
if DRY_RUN: log.debug('dryrun: {}'.format(' '.join(args)))
else: log.debug('run: {}' .format(' '.join(args)))
# dry run - just echo back the command
if DRY_RUN: if DRY_RUN:
stderr = '' return _dryrun(
if submit is None: submit, returnStdout, returnStderr, returnExitcode, *args)
stdout = ' '.join(args)
else:
stdout = '[submit] ' + ' '.join(args)
results = [stdout]
if err: results.append(stderr)
if ret: results.append(0)
if len(results) == 1: return results[0]
else: return tuple(results)
# submit - delegate to fslsub # submit - delegate to fslsub
if submit is not None: if submit is not None:
return fslsub.submit(' '.join(args), **submit) return fslsub.submit(' '.join(args), **submit)
# Start the command, directing its # Run directly - delegate to _realrun
# stdout/stderr to temporary files stdout, stderr, exitcode = _realrun(
# and, if tee is True, to sys.stdout tee, logStdout, logStderr, logCmd, *args)
# stderr.
if not returnExitcode and (exitcode != 0):
raise RuntimeError('{} returned non-zero exit code: {}'.format(
args[0], exitcode))
results = []
if returnStdout: results.append(stdout)
if returnStderr: results.append(stderr)
if returnExitcode: results.append(exitcode)
if len(results) == 1: return results[0]
else: return tuple(results)
def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
"""Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
active.
"""
if submit:
return ('0',)
results = []
stderr = ''
stdout = ' '.join(args)
if returnStdout: results.append(stdout)
if returnStderr: results.append(stderr)
if returnExitcode: results.append(0)
if len(results) == 1: return results[0]
else: return tuple(results)
def _realrun(tee, logStdout, logStderr, logCmd, *args):
"""Used by :func:`run`. Runs the given command and manages its standard
output and error streams.
:arg tee: If ``True``, the command's standard output and error
streams are forwarded to this process' standard output/
error.
:arg logStdout: Optional file-like object to which the command's standard
output stream can be forwarded.
:arg logStderr: Optional file-like object to which the command's standard
error stream can be forwarded.
:arg logCmd: If ``True``, the command itself is logged to the standard
output stream(s).
:arg args: Command to run
:returns: A tuple containing:
- the command's standard output as a string.
- the command's standard error as a string.
- the command's exit code.
"""
proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE) proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
with tempdir.tempdir(changeto=False) as td: with tempdir.tempdir(changeto=False) as td:
# We always direct the command's stdout/
# stderr to two temporary files
stdoutf = op.join(td, 'stdout') stdoutf = op.join(td, 'stdout')
stderrf = op.join(td, 'stderr') stderrf = op.join(td, 'stderr')
with open(stdoutf, 'wb') as stdout, \ with open(stdoutf, 'wb') as stdout, \
open(stderrf, 'wb') as stderr: # noqa open(stderrf, 'wb') as stderr: # noqa
outstreams = [stdout]
errstreams = [stderr]
# if tee, we duplicate the command's
# stdout/stderr to this process'
# stdout/stderr
if tee: if tee:
stdoutt = forward(proc.stdout, stdout, sys.stdout) outstreams.append(sys.stdout)
stderrt = forward(proc.stderr, stderr, sys.stderr) errstreams.append(sys.stderr)
else:
stdoutt = forward(proc.stdout, stdout) # And we also duplicate to caller-
stderrt = forward(proc.stderr, stderr) # provided streams if they're given.
if logStdout is not None: outstreams.append(logStdout)
if logStderr is not None: errstreams.append(logStderr)
# log the command to
# stdout if requested
if logCmd:
cmd = ' '.join(args) + '\n'
for o in outstreams:
if 'b' in getattr(o, 'mode', 'w'):
o.write(cmd.encode('utf-8'))
else:
o.write(cmd)
stdoutt = _forwardStream(proc.stdout, *outstreams)
stderrt = _forwardStream(proc.stderr, *errstreams)
# Wait until the forwarding threads # Wait until the forwarding threads
# have finished cleanly, and the # have finished cleanly, and the
...@@ -221,21 +329,11 @@ def run(*args, **kwargs): ...@@ -221,21 +329,11 @@ def run(*args, **kwargs):
with open(stdoutf, 'rb') as f: stdout = f.read() with open(stdoutf, 'rb') as f: stdout = f.read()
with open(stderrf, 'rb') as f: stderr = f.read() with open(stderrf, 'rb') as f: stderr = f.read()
retcode = proc.returncode exitcode = proc.returncode
stdout = stdout.decode('utf-8') stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8') stderr = stderr.decode('utf-8')
if not ret and (retcode != 0): return stdout, stderr, exitcode
raise RuntimeError('{} returned non-zero exit code: {}'.format(
args[0], retcode))
results = [stdout]
if err: results.append(stderr)
if ret: results.append(retcode)
if len(results) == 1: return results[0]
else: return tuple(results)
def runfsl(*args, **kwargs): def runfsl(*args, **kwargs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment