Skip to content
Snippets Groups Projects
Forked from FSL / fslpy
821 commits behind the upstream repository.
run.py 14.18 KiB
#!/usr/bin/env python
#
# run.py - Functions for running shell commands
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
# Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk>
#
"""This module provides some functions for running shell commands.

.. note:: The functions in this module are only known to work in Unix-like
          environments.

.. autosummary::
   :nosignatures:

   run
   runfsl
   wait
   dryrun
"""


import                    sys
import                    shlex
import                    logging
import                    threading
import                    contextlib
import collections.abc as abc
import subprocess      as sp
import os.path         as op
import                    os

import                    six

from   fsl.utils.platform import platform as fslplatform
import fsl.utils.fslsub                   as fslsub
import fsl.utils.tempdir                  as tempdir
import fsl.utils.path                     as fslpath

log = logging.getLogger(__name__)


DRY_RUN = False
"""If ``True``, the :func:`run` function will only log commands, but will not
execute them.
"""


FSL_PREFIX = None
"""Global override for the FSL executable location used by :func:`runfsl`. """


class FSLNotPresent(Exception):
    """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot
    be found.
    """
    pass


@contextlib.contextmanager
def dryrun(*args):
    """Context manager which causes all calls to :func:`run` to be logged but
    not executed. See the :data:`DRY_RUN` flag.

    The returned standard output will be equal to ``' '.join(args)``.
    """
    global DRY_RUN

    oldval  = DRY_RUN
    DRY_RUN = True

    try:
        yield
    finally:
        DRY_RUN = oldval


def prepareArgs(args):
    """Used by the :func:`run` function. Ensures that the given arguments is a
    list of strings.
    """

    if len(args) == 1:

        # Argument was a command string
        if isinstance(args[0], six.string_types):
            args = shlex.split(args[0])

        # Argument was an unpacked sequence
        else:
            args = args[0]

    return list(args)


real_stdout = sys.stdout
def _forwardStream(in_, *outs):
    """Creates and starts a daemon thread which forwards the given input stream
    to one or more output streams. Used by the :func:`run` function to redirect
    a command's standard output/error streams to more than one destination.

    It is necessary to read the process stdout/ stderr on separate threads to
    avoid deadlocks.

    :arg in_:  Input stream
    :arg outs: Output stream(s)
    :returns:  The thread that has been started.
    """

    # not all file-likes have a mode attribute -
    # if not present, assume a string stream
    omodes = [getattr(o, 'mode', 'w') for o in outs]

    def realForward():
        for line in iter(in_.readline, b''):
            for i, o in enumerate(outs):
                if 'b' in omodes[i]: o.write(line)
                else:                o.write(line.decode('utf-8'))

    t = threading.Thread(target=realForward)
    t.daemon = True
    t.start()
    return t


def run(*args, **kwargs):
    """Call a command and return its output. You can pass the command and
    arguments as a single string, or as a regular or unpacked sequence.

    The command can be run on a cluster by using the ``submit`` keyword
    argument.

    An exception is raised if the command returns a non-zero exit code, unless
    the ``exitcode`` option is set to ``True``.

    :arg stdout:   Must be passed as a keyword argument. Defaults to ``True``.
                   If ``True``, standard output is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg stderr:   Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, standard error is captured and returned.
                   Ignored if ``submit`` is specified.

    :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``.
                   If ``True``, and the command's return code is non-0, an
                   exception is not raised.  Ignored if ``submit`` is
                   specified.

    :arg submit:   Must be passed as a keyword argument. Defaults to ``None``.
                   If ``True``, the command is submitted as a cluster job via
                   the :func:`.fslsub.submit` function.  May also be a
                   dictionary containing arguments to that function.

    :arg log:      Must be passed as a keyword argument.  An optional ``dict``
                   which may be used to redirect the command's standard output
                   and error. The following keys are recognised:

                     - tee:    If ``True``, the command's standard output/error
                               streams are forwarded to this processes streams.

                     - stdout: Optional file-like object to which the command's
                               standard output stream can be forwarded.

                     - stderr: Optional file-like object to which the command's
                               standard error stream can be forwarded.

                     - cmd:    Optional file-like object to which the command
                               itself is logged.

    All other keyword arguments are passed through to the ``subprocess.Popen``
    object (via :func:`_realrun`), unless ``submit=True``, in which case they
    are passed through to the :func:`.fslsub.submit` function.

    :returns:      If ``submit`` is provided, the return value of
                   :func:`.fslsub` is returned. Otherwise returns a single
                   value or a tuple, based on the based on the ``stdout``,
                   ``stderr``, and ``exitcode`` arguments.
    """

    returnStdout   = kwargs.pop('stdout',   True)
    returnStderr   = kwargs.pop('stderr',   False)
    returnExitcode = kwargs.pop('exitcode', False)
    submit         = kwargs.pop('submit',   {})
    log            = kwargs.pop('log',      None)
    args           = prepareArgs(args)

    if log is None:
        log = {}

    tee       = log.get('tee',    False)
    logStdout = log.get('stdout', None)
    logStderr = log.get('stderr', None)
    logCmd    = log.get('cmd',    None)

    if not bool(submit):
        submit = None

    if submit is not None:
        returnStdout   = False
        returnStderr   = False
        returnExitcode = False

        if submit is True:
            submit = dict()

    if submit is not None and not isinstance(submit, abc.Mapping):
        raise ValueError('submit must be a mapping containing '
                         'options for fsl.utils.fslsub.submit')

    if DRY_RUN:
        return _dryrun(
            submit, returnStdout, returnStderr, returnExitcode, *args)

    # submit - delegate to fslsub
    if submit is not None:
        return fslsub.submit(' '.join(args), **submit, **kwargs)

    # Run directly - delegate to _realrun
    stdout, stderr, exitcode = _realrun(
        tee, logStdout, logStderr, logCmd, *args, **kwargs)

    if not returnExitcode and (exitcode != 0):
        raise RuntimeError('{} returned non-zero exit code: {}'.format(
            args[0], exitcode))

    results = []
    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(exitcode)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args):
    """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is
    active.
    """

    if submit:
        return ('0',)

    results = []
    stderr  = ''
    stdout  = ' '.join(args)

    if returnStdout:   results.append(stdout)
    if returnStderr:   results.append(stderr)
    if returnExitcode: results.append(0)

    if len(results) == 1: return results[0]
    else:                 return tuple(results)


def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs):
    """Used by :func:`run`. Runs the given command and manages its standard
    output and error streams.

    :arg tee:       If ``True``, the command's standard output and error
                    streams are forwarded to this process' standard output/
                    error.

    :arg logStdout: Optional file-like object to which the command's standard
                    output stream can be forwarded.

    :arg logStderr: Optional file-like object to which the command's standard
                    error stream can be forwarded.

    :arg logCmd:    Optional file-like object to which the command itself is
                    logged.

    :arg args:      Command to run

    :arg kwargs:    Passed through to the ``subprocess.Popen`` object.

    :returns:       A tuple containing:
                      - the command's standard output as a string.
                      - the command's standard error as a string.
                      - the command's exit code.
    """
    proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs)
    with tempdir.tempdir(changeto=False) as td:

        # We always direct the command's stdout/
        # stderr to two temporary files
        stdoutf = op.join(td, 'stdout')
        stderrf = op.join(td, 'stderr')

        with open(stdoutf, 'wb') as stdout, \
             open(stderrf, 'wb') as stderr:  # noqa

            outstreams = [stdout]
            errstreams = [stderr]

            # if tee, we duplicate the command's
            # stdout/stderr to this process'
            # stdout/stderr
            if tee:
                outstreams.append(sys.stdout)
                errstreams.append(sys.stderr)

            # And we also duplicate to caller-
            # provided streams if they're given.
            if logStdout is not None: outstreams.append(logStdout)
            if logStderr is not None: errstreams.append(logStderr)

            # log the command if requested
            if logCmd is not None:
                cmd = ' '.join(args) + '\n'
                if 'b' in getattr(logCmd, 'mode', 'w'):
                    logCmd.write(cmd.encode('utf-8'))
                else:
                    logCmd.write(cmd)

            stdoutt = _forwardStream(proc.stdout, *outstreams)
            stderrt = _forwardStream(proc.stderr, *errstreams)

            # Wait until the forwarding threads
            # have finished cleanly, and the
            # command has terminated.
            stdoutt.join()
            stderrt.join()
            proc.communicate()

        # Read in the command's stdout/stderr
        with open(stdoutf, 'rb') as f: stdout = f.read()
        with open(stderrf, 'rb') as f: stderr = f.read()

    exitcode = proc.returncode
    stdout   = stdout.decode('utf-8')
    stderr   = stderr.decode('utf-8')

    return stdout, stderr, exitcode


def runfsl(*args, **kwargs):
    """Call a FSL command and return its output.

      This function searches for the command in the following
      locations (ordered by priority):

      1. ``FSL_PREFIX``
      2. ``$FSLDEVDIR/bin``
      3. ``$FSLDIR/bin``

      If found, the full path to the command is then passed to :func:`run`.
    """
    prefixes = []

    if FSL_PREFIX is not None:
        prefixes.append(FSL_PREFIX)
    if fslplatform.fsldevdir is not None:
        prefixes.append(op.join(fslplatform.fsldevdir, 'bin'))
    if fslplatform.fsldir is not None:
        prefixes.append(op.join(fslplatform.fsldir, 'bin'))

    if not prefixes:
        raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!')

    args = prepareArgs(args)
    for prefix in prefixes:
        cmdpath = op.join(prefix, args[0])
        if fslplatform.fslwsl:
            wslargs = wslcmd(cmdpath, *args)
            if wslargs is not None:
                args = wslargs
                break
        elif op.isfile(cmdpath):
            args[0] = cmdpath
            break

    # error if the command cannot
    # be found in a FSL directory
    else:
        raise FileNotFoundError('FSL tool {} not found (checked {})'.format(
            args[0], ', '.join(prefixes)))

    return run(*args, **kwargs)


def wslcmd(cmdpath, *args):
    """
    Convert a command + arguments into an equivalent set of arguments that will run the command
    under Windows Subsystem for Linux

    :param cmdpath: Fully qualified path to the command. This is essentially a WSL path not a Windows
                    one since FSLDIR is specified as a WSL path, however it may have backslashes
                    as path separators due to previous use of ``os.path.join``
    :param args: Sequence of command arguments (the first of which is the unqualified command name)

    :return: If ``cmdpath`` exists and is executable in WSL, return a sequence of command arguments
             which when executed will run the command in WSL. Windows paths in the argument list will
             be converted to WSL paths. If ``cmdpath`` was not executable in WSL, returns None
    """
    # Check if command exists in WSL (remembering that the command path may include FSLDIR which
    # is a Windows path)
    cmdpath = fslpath.wslpath(cmdpath)
    retcode = sp.call(["wsl", "test", "-x", cmdpath])
    if retcode == 0:
        # Form a new argument list and convert any Windows paths in it into WSL paths
        wslargs = [fslpath.wslpath(arg) for arg in args]
        wslargs[0] = cmdpath
        local_fsldir = fslpath.wslpath(fslplatform.fsldir)
        if fslplatform.fsldevdir:
            local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir)
        else:
            local_fsldevdir = None
        # Prepend important environment variables - note that it seems we cannot
        # use WSLENV for this due to its insistance on path mapping. FIXME FSLDEVDIR?
        local_path = "$PATH"
        if local_fsldevdir:
            local_path += ":%s/bin" % local_fsldevdir
        local_path += ":%s/bin" % local_fsldir
        prepargs = [
            "wsl",
            "PATH=%s" % local_path,
            "FSLDIR=%s" % local_fsldir,
            "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ")
        ]
        if local_fsldevdir:
            prepargs.append("FSLDEVDIR=%s" % local_fsldevdir)
        return prepargs + wslargs
    else:
        # Command was not found in WSL with this path
        return None


def wait(job_ids):
    """Proxy for :func:`.fslsub.wait`. """
    return fslsub.wait(job_ids)