#!/usr/bin/env python # # run.py - Functions for running shell commands # # Author: Paul McCarthy <pauldmccarthy@gmail.com> # Author: Michiel Cottaar <michiel.cottaar@ndcn.ox.ac.uk> # """This module provides some functions for running shell commands. .. note:: The functions in this module are only known to work in Unix-like environments. .. autosummary:: :nosignatures: run runfsl wait dryrun """ import sys import shlex import logging import threading import contextlib import collections.abc as abc import subprocess as sp import os.path as op import os import six from fsl.utils.platform import platform as fslplatform import fsl.utils.fslsub as fslsub import fsl.utils.tempdir as tempdir import fsl.utils.path as fslpath log = logging.getLogger(__name__) DRY_RUN = False """If ``True``, the :func:`run` function will only log commands, but will not execute them. """ FSL_PREFIX = None """Global override for the FSL executable location used by :func:`runfsl`. """ class FSLNotPresent(Exception): """Error raised by the :func:`runfsl` function when ``$FSLDIR`` cannot be found. """ pass @contextlib.contextmanager def dryrun(*args): """Context manager which causes all calls to :func:`run` to be logged but not executed. See the :data:`DRY_RUN` flag. The returned standard output will be equal to ``' '.join(args)``. """ global DRY_RUN oldval = DRY_RUN DRY_RUN = True try: yield finally: DRY_RUN = oldval def prepareArgs(args): """Used by the :func:`run` function. Ensures that the given arguments is a list of strings. """ if len(args) == 1: # Argument was a command string if isinstance(args[0], six.string_types): args = shlex.split(args[0]) # Argument was an unpacked sequence else: args = args[0] return list(args) real_stdout = sys.stdout def _forwardStream(in_, *outs): """Creates and starts a daemon thread which forwards the given input stream to one or more output streams. Used by the :func:`run` function to redirect a command's standard output/error streams to more than one destination. It is necessary to read the process stdout/ stderr on separate threads to avoid deadlocks. :arg in_: Input stream :arg outs: Output stream(s) :returns: The thread that has been started. """ # not all file-likes have a mode attribute - # if not present, assume a string stream omodes = [getattr(o, 'mode', 'w') for o in outs] def realForward(): for line in iter(in_.readline, b''): for i, o in enumerate(outs): if 'b' in omodes[i]: o.write(line) else: o.write(line.decode('utf-8')) t = threading.Thread(target=realForward) t.daemon = True t.start() return t def run(*args, **kwargs): """Call a command and return its output. You can pass the command and arguments as a single string, or as a regular or unpacked sequence. The command can be run on a cluster by using the ``submit`` keyword argument. An exception is raised if the command returns a non-zero exit code, unless the ``exitcode`` option is set to ``True``. :arg stdout: Must be passed as a keyword argument. Defaults to ``True``. If ``True``, standard output is captured and returned. Ignored if ``submit`` is specified. :arg stderr: Must be passed as a keyword argument. Defaults to ``False``. If ``True``, standard error is captured and returned. Ignored if ``submit`` is specified. :arg exitcode: Must be passed as a keyword argument. Defaults to ``False``. If ``True``, and the command's return code is non-0, an exception is not raised. Ignored if ``submit`` is specified. :arg submit: Must be passed as a keyword argument. Defaults to ``None``. If ``True``, the command is submitted as a cluster job via the :func:`.fslsub.submit` function. May also be a dictionary containing arguments to that function. :arg log: Must be passed as a keyword argument. An optional ``dict`` which may be used to redirect the command's standard output and error. The following keys are recognised: - tee: If ``True``, the command's standard output/error streams are forwarded to this processes streams. - stdout: Optional file-like object to which the command's standard output stream can be forwarded. - stderr: Optional file-like object to which the command's standard error stream can be forwarded. - cmd: Optional file-like object to which the command itself is logged. All other keyword arguments are passed through to the ``subprocess.Popen`` object (via :func:`_realrun`), unless ``submit=True``, in which case they are passed through to the :func:`.fslsub.submit` function. :returns: If ``submit`` is provided, the return value of :func:`.fslsub` is returned. Otherwise returns a single value or a tuple, based on the based on the ``stdout``, ``stderr``, and ``exitcode`` arguments. """ returnStdout = kwargs.pop('stdout', True) returnStderr = kwargs.pop('stderr', False) returnExitcode = kwargs.pop('exitcode', False) submit = kwargs.pop('submit', {}) log = kwargs.pop('log', None) args = prepareArgs(args) if log is None: log = {} tee = log.get('tee', False) logStdout = log.get('stdout', None) logStderr = log.get('stderr', None) logCmd = log.get('cmd', None) if not bool(submit): submit = None if submit is not None: returnStdout = False returnStderr = False returnExitcode = False if submit is True: submit = dict() if submit is not None and not isinstance(submit, abc.Mapping): raise ValueError('submit must be a mapping containing ' 'options for fsl.utils.fslsub.submit') if DRY_RUN: return _dryrun( submit, returnStdout, returnStderr, returnExitcode, *args) # submit - delegate to fslsub if submit is not None: return fslsub.submit(' '.join(args), **submit, **kwargs) # Run directly - delegate to _realrun stdout, stderr, exitcode = _realrun( tee, logStdout, logStderr, logCmd, *args, **kwargs) if not returnExitcode and (exitcode != 0): raise RuntimeError('{} returned non-zero exit code: {}'.format( args[0], exitcode)) results = [] if returnStdout: results.append(stdout) if returnStderr: results.append(stderr) if returnExitcode: results.append(exitcode) if len(results) == 1: return results[0] else: return tuple(results) def _dryrun(submit, returnStdout, returnStderr, returnExitcode, *args): """Used by the :func:`run` function when the :attr:`DRY_RUN` flag is active. """ if submit: return ('0',) results = [] stderr = '' stdout = ' '.join(args) if returnStdout: results.append(stdout) if returnStderr: results.append(stderr) if returnExitcode: results.append(0) if len(results) == 1: return results[0] else: return tuple(results) def _realrun(tee, logStdout, logStderr, logCmd, *args, **kwargs): """Used by :func:`run`. Runs the given command and manages its standard output and error streams. :arg tee: If ``True``, the command's standard output and error streams are forwarded to this process' standard output/ error. :arg logStdout: Optional file-like object to which the command's standard output stream can be forwarded. :arg logStderr: Optional file-like object to which the command's standard error stream can be forwarded. :arg logCmd: Optional file-like object to which the command itself is logged. :arg args: Command to run :arg kwargs: Passed through to the ``subprocess.Popen`` object. :returns: A tuple containing: - the command's standard output as a string. - the command's standard error as a string. - the command's exit code. """ proc = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, **kwargs) with tempdir.tempdir(changeto=False) as td: # We always direct the command's stdout/ # stderr to two temporary files stdoutf = op.join(td, 'stdout') stderrf = op.join(td, 'stderr') with open(stdoutf, 'wb') as stdout, \ open(stderrf, 'wb') as stderr: # noqa outstreams = [stdout] errstreams = [stderr] # if tee, we duplicate the command's # stdout/stderr to this process' # stdout/stderr if tee: outstreams.append(sys.stdout) errstreams.append(sys.stderr) # And we also duplicate to caller- # provided streams if they're given. if logStdout is not None: outstreams.append(logStdout) if logStderr is not None: errstreams.append(logStderr) # log the command if requested if logCmd is not None: cmd = ' '.join(args) + '\n' if 'b' in getattr(logCmd, 'mode', 'w'): logCmd.write(cmd.encode('utf-8')) else: logCmd.write(cmd) stdoutt = _forwardStream(proc.stdout, *outstreams) stderrt = _forwardStream(proc.stderr, *errstreams) # Wait until the forwarding threads # have finished cleanly, and the # command has terminated. stdoutt.join() stderrt.join() proc.communicate() # Read in the command's stdout/stderr with open(stdoutf, 'rb') as f: stdout = f.read() with open(stderrf, 'rb') as f: stderr = f.read() exitcode = proc.returncode stdout = stdout.decode('utf-8') stderr = stderr.decode('utf-8') return stdout, stderr, exitcode def runfsl(*args, **kwargs): """Call a FSL command and return its output. This function searches for the command in the following locations (ordered by priority): 1. ``FSL_PREFIX`` 2. ``$FSLDEVDIR/bin`` 3. ``$FSLDIR/bin`` If found, the full path to the command is then passed to :func:`run`. """ prefixes = [] if FSL_PREFIX is not None: prefixes.append(FSL_PREFIX) if fslplatform.fsldevdir is not None: prefixes.append(op.join(fslplatform.fsldevdir, 'bin')) if fslplatform.fsldir is not None: prefixes.append(op.join(fslplatform.fsldir, 'bin')) if not prefixes: raise FSLNotPresent('$FSLDIR is not set - FSL cannot be found!') args = prepareArgs(args) for prefix in prefixes: cmdpath = op.join(prefix, args[0]) if fslplatform.fslwsl: wslargs = wslcmd(cmdpath, *args) if wslargs is not None: args = wslargs break elif op.isfile(cmdpath): args[0] = cmdpath break # error if the command cannot # be found in a FSL directory else: raise FileNotFoundError('FSL tool {} not found (checked {})'.format( args[0], ', '.join(prefixes))) return run(*args, **kwargs) def wslcmd(cmdpath, *args): """ Convert a command + arguments into an equivalent set of arguments that will run the command under Windows Subsystem for Linux :param cmdpath: Fully qualified path to the command. This is essentially a WSL path not a Windows one since FSLDIR is specified as a WSL path, however it may have backslashes as path separators due to previous use of ``os.path.join`` :param args: Sequence of command arguments (the first of which is the unqualified command name) :return: If ``cmdpath`` exists and is executable in WSL, return a sequence of command arguments which when executed will run the command in WSL. Windows paths in the argument list will be converted to WSL paths. If ``cmdpath`` was not executable in WSL, returns None """ # Check if command exists in WSL (remembering that the command path may include FSLDIR which # is a Windows path) cmdpath = fslpath.wslpath(cmdpath) retcode = sp.call(["wsl", "test", "-x", cmdpath]) if retcode == 0: # Form a new argument list and convert any Windows paths in it into WSL paths wslargs = [fslpath.wslpath(arg) for arg in args] wslargs[0] = cmdpath local_fsldir = fslpath.wslpath(fslplatform.fsldir) if fslplatform.fsldevdir: local_fsldevdir = fslpath.wslpath(fslplatform.fsldevdir) else: local_fsldevdir = None # Prepend important environment variables - note that it seems we cannot # use WSLENV for this due to its insistance on path mapping. FIXME FSLDEVDIR? local_path = "$PATH" if local_fsldevdir: local_path += ":%s/bin" % local_fsldevdir local_path += ":%s/bin" % local_fsldir prepargs = [ "wsl", "PATH=%s" % local_path, "FSLDIR=%s" % local_fsldir, "FSLOUTPUTTYPE=%s" % os.environ.get("FSLOUTPUTTYPE", "NIFTI_GZ") ] if local_fsldevdir: prepargs.append("FSLDEVDIR=%s" % local_fsldevdir) return prepargs + wslargs else: # Command was not found in WSL with this path return None def wait(job_ids): """Proxy for :func:`.fslsub.wait`. """ return fslsub.wait(job_ids)