Skip to content
Snippets Groups Projects
Forked from FSL / fslpy
813 commits behind the upstream repository.
test_run.py 11.46 KiB
#!/usr/bin/env python
#
# test_run.py -
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#


import os.path as op
import            os
import            shutil
import            textwrap

# python 3
try:  from unittest import mock
# python 2
except ImportError: import mock

import six
import pytest

import fsl.utils.tempdir                  as tempdir
from   fsl.utils.platform import platform as fslplatform
import fsl.utils.run                      as run
import fsl.utils.fslsub                   as fslsub

from . import make_random_image, mockFSLDIR, CaptureStdout


pytestmark = pytest.mark.unixtest


def mkexec(path, contents):
    with open(path, 'wt') as f:
        f.write(contents)
    os.chmod(path, 0o755)


def test_prepareArgs():
    tests = [
        ('a b c',              ['a', 'b', 'c']),
        (['a', 'b', 'c'],      ['a', 'b', 'c']),
        ('abc "woop woop"',    ['abc', 'woop woop']),
        (['abc', 'woop woop'], ['abc', 'woop woop']),
    ]

    for args, expected in tests:
        assert run.prepareArgs((args, )) == expected

def test_run():

    test_script = textwrap.dedent("""
    #!/bin/bash

    echo "standard output - arguments: $@"
    echo "standard error" >&2
    exit {}
    """).strip()

    with tempdir.tempdir():

        # return code == 0
        mkexec('script.sh', test_script.format(0))

        expstdout = "standard output - arguments: 1 2 3\n"
        expstderr = "standard error\n"

        # test:
        #   - single string
        #   - packed sequence
        #   - unpacked sequence
        assert run.run('./script.sh 1 2 3')             == expstdout
        assert run.run(('./script.sh', '1', '2', '3'))  == expstdout
        assert run.run(*('./script.sh', '1', '2', '3')) == expstdout

        # test stdout/stderr
        stdout, stderr = run.run('./script.sh 1 2 3', stderr=True)
        assert stdout == expstdout
        assert stderr == expstderr

        # test return code
        res = run.run('./script.sh 1 2 3', exitcode=True)
        stdout, ret = res
        assert stdout == expstdout
        assert ret == 0
        stdout, stderr, ret = run.run('./script.sh 1 2 3', stderr=True,
                                      exitcode=True)
        assert stdout == expstdout
        assert stderr == expstderr
        assert ret == 0

        # stdout=False
        res = run.run('./script.sh 1 2 3', stdout=False)
        assert res == ()
        stderr = run.run('./script.sh 1 2 3', stdout=False, stderr=True)
        assert stderr == expstderr

        # return code != 0
        mkexec('./script.sh', test_script.format(255))

        with pytest.raises(RuntimeError):
            run.run('./script.sh 1 2 3')

        stdout, ret = run.run('./script.sh 1 2 3', exitcode=True)
        assert stdout == expstdout
        assert ret == 255


def test_run_tee():
    test_script = textwrap.dedent("""
    #!/bin/bash

    echo "standard output - arguments: $@"
    echo "standard error" >&2
    exit 0
    """).strip()

    with tempdir.tempdir():
        mkexec('script.sh', test_script)

        expstdout = "standard output - arguments: 1 2 3\n"
        expstderr = "standard error\n"

        capture = CaptureStdout()

        with capture:
            stdout = run.run('./script.sh 1 2 3', log={'tee' : True})

        assert stdout         == expstdout
        assert capture.stdout == expstdout

        with capture.reset():
            stdout, stderr = run.run('./script.sh 1 2 3', stderr=True,
                                     log={'tee' : True})

        assert stdout         == expstdout
        assert stderr         == expstderr
        assert capture.stdout == expstdout
        assert capture.stderr == expstderr

        with capture.reset():
            stdout, stderr, ret = run.run('./script.sh 1 2 3',
                                          stderr=True,
                                          exitcode=True,
                                          log={'tee' : True})

        assert ret            == 0
        assert stdout         == expstdout
        assert stderr         == expstderr
        assert capture.stdout == expstdout
        assert capture.stderr == expstderr

        with capture.reset():
            stdout, ret = run.run('./script.sh 1 2 3',
                                  exitcode=True,
                                  log={'tee' : True})

        assert ret            == 0
        assert stdout         == expstdout
        assert capture.stdout == expstdout


def test_run_passthrough():

    test_script = textwrap.dedent("""
    #!/bin/bash

    echo "env: $RUN_TEST_ENV_VAR"
    """).strip()

    with tempdir.tempdir():

        # return code == 0
        mkexec('script.sh', test_script.format(0))

        env       = {'RUN_TEST_ENV_VAR' : 'howdy ho'}
        expstdout = "env: howdy ho\n"

        assert run.run('./script.sh', env=env) == expstdout


def test_dryrun():

    test_script = textwrap.dedent("""
    #!/bin/bash
    touch foo
    """).strip()

    with tempdir.tempdir():
        mkexec('./script.sh', test_script)

        run.run('./script.sh')
        assert op.exists('foo')

        os.remove('foo')

        with run.dryrun():
            run.run('./script.sh')

        assert not op.exists('foo')


# test runfsl with/without $FSLDIR
def test_runfsl():

    test_script = textwrap.dedent("""
    #!/bin/bash
    echo {}
    exit 0
    """).strip()

    old_fsldir    = fslplatform.fsldir
    old_fsldevdir = fslplatform.fsldevdir

    try:
        with tempdir.tempdir():

            make_random_image('image.nii.gz')

            # no FSLDIR - should error
            fslplatform.fsldir    = None
            fslplatform.fsldevdir = None
            with pytest.raises(run.FSLNotPresent):
                run.runfsl('fslhd')

            # FSLDIR/bin exists - should be good
            fsldir = op.abspath('./fsl')
            fslhd  = op.join(fsldir, 'bin', 'fslhd')
            os.makedirs(op.join(fsldir, 'bin'))

            mkexec(fslhd, test_script.format('fsldir'))

            fslplatform.fsldir = fsldir
            assert run.runfsl('fslhd').strip() == 'fsldir'

            # non-FSL command - should error
            with pytest.raises(FileNotFoundError):
                run.runfsl('ls')

            # FSLDEVDIR should take precedence
            fsldevdir = './fsldev'
            fslhd  = op.join(fsldevdir, 'bin', 'fslhd')
            shutil.copytree(fsldir, fsldevdir)

            mkexec(fslhd, test_script.format('fsldevdir'))

            fslplatform.fsldevdir = fsldevdir
            fslplatform.fsldir    = None
            assert run.runfsl('fslhd').strip() == 'fsldevdir'

            # FSL_PREFIX should override all
            override = './override'
            fslhd    = op.join(override, 'fslhd')
            os.makedirs(override)
            mkexec(fslhd, test_script.format('override'))

            fslplatform.fsldir    = None
            fslplatform.fsldevdir = None
            run.FSL_PREFIX = override
            assert run.runfsl('fslhd').strip() == 'override'

    finally:
        fslplatform.fsldir    = old_fsldir
        fslplatform.fsldevdir = old_fsldevdir
        run.FSL_PREFIX        = None


def mock_submit(cmd, **kwargs):
    if isinstance(cmd, six.string_types):
        name = cmd.split()[0]
    else:
        name = cmd[0]

    name = op.basename(name)

    jid = '12345'
    output = run.run(cmd)

    with open('{}.o{}'.format(name, jid), 'wt') as f:
        f.write(output)

    with open('{}.e{}'.format(name, jid), 'wt') as f:
        for k in sorted(kwargs.keys()):
            f.write('{}: {}\n'.format(k, kwargs[k]))

    return jid


def test_run_submit():

    def mkexec(path, contents):
        with open(path, 'wt') as f:
            f.write(contents)
        os.chmod(path, 0o755)

    test_script = textwrap.dedent("""
    #!/usr/bin/env bash
    echo test_script running
    exit 0
    """).strip()

    with tempdir.tempdir(), \
         mockFSLDIR(), \
         mock.patch('fsl.utils.fslsub.submit', mock_submit):

        mkexec(op.expandvars('$FSLDIR/bin/fsltest'), test_script)

        jid = run.run('fsltest', submit=True)
        assert jid == '12345'
        stdout, stderr = fslsub.output(jid)
        assert stdout == 'test_script running\n'
        assert stderr == ''

        # or can pass submit opts as a dict
        kwargs = {'name' : 'abcde', 'ram' : '4GB'}
        jid = run.run('fsltest', submit=kwargs)
        assert jid == '12345'
        stdout, stderr = fslsub.output(jid)
        experr = '\n'.join(['{}: {}'.format(k, kwargs[k])
                            for k in sorted(kwargs.keys())]) + '\n'
        assert stdout == 'test_script running\n'
        assert stderr == experr

        # or can pass submit opts as kwargs
        kwargs = {'name' : 'abcde', 'ram' : '4GB'}
        jid = run.run('fsltest', submit=True, **kwargs)
        assert jid == '12345'
        stdout, stderr = fslsub.output(jid)
        experr = '\n'.join(['{}: {}'.format(k, kwargs[k])
                            for k in sorted(kwargs.keys())]) + '\n'
        assert stdout == 'test_script running\n'
        assert stderr == experr



def test_run_streams():
    """
    """

    test_script = textwrap.dedent("""
    #!/usr/bin/env bash
    echo standard output
    echo standard error >&2
    exit 0
    """).strip()

    expstdout = 'standard output\n'
    expstderr = 'standard error\n'

    with tempdir.tempdir():
        mkexec('./script.sh', test_script)

        with open('my_stdout', 'wt') as stdout, \
             open('my_stderr', 'wt') as stderr:

            stdout, stderr = run.run('./script.sh',
                                     stderr=True,
                                     log={'stdout' : stdout,
                                          'stderr' : stderr})

        assert stdout                         == expstdout
        assert stderr                         == expstderr
        assert open('my_stdout', 'rt').read() == expstdout
        assert open('my_stderr', 'rt').read() == expstderr


        capture = CaptureStdout()

        with open('my_stdout', 'wt') as stdout, \
             open('my_stderr', 'wt') as stderr, \
             capture.reset():

            stdout, stderr = run.run('./script.sh',
                                     stderr=True,
                                     log={'tee'    : True,
                                          'stdout' : stdout,
                                          'stderr' : stderr})

        assert stdout                         == expstdout
        assert stderr                         == expstderr
        assert capture.stdout                 == expstdout
        assert capture.stderr                 == expstderr
        assert open('my_stdout', 'rt').read() == expstdout
        assert open('my_stderr', 'rt').read() == expstderr


def test_run_logcmd():
    test_script = textwrap.dedent("""
    #!/usr/bin/env bash
    echo output $@
    exit 0
    """).strip()

    expcmd = './script.sh 1 2 3\n'
    expstdout = 'output 1 2 3\n'

    with tempdir.tempdir():
        mkexec('script.sh', test_script)

        with open('my_stdout', 'wt') as stdoutf:
            stdout = run.run('./script.sh 1 2 3',
                             log={'cmd' : stdoutf})

        assert stdout                         == expstdout
        assert open('my_stdout', 'rt').read() == expcmd

        with open('my_stdout', 'wt') as stdoutf:
            stdout = run.run('./script.sh 1 2 3',
                             log={'cmd' : stdoutf, "stdout" : stdoutf})

        assert stdout                         == expstdout
        assert open('my_stdout', 'rt').read() == expcmd + expstdout