Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • paulmc/fslpy
  • ndcn0236/fslpy
  • seanf/fslpy
3 results
Show changes
Showing
with 525 additions and 157 deletions
......@@ -12,7 +12,7 @@ import itertools as it
import numpy as np
import pytest
import tests
import fsl.tests as tests
import fsl.utils.path as fslpath
import fsl.data.melodicanalysis as mela
......@@ -55,10 +55,10 @@ def test_isMelodicDir():
meldir = op.join(testdir, 'analysis.ica')
assert mela.isMelodicDir(meldir)
# Directory must end in .ica
# non-.ica prefix is ok
with tests.testdir([p.replace('.ica', '.blob') for p in paths]) as testdir:
meldir = op.join(testdir, 'analysis.blob')
assert not mela.isMelodicDir(meldir)
assert mela.isMelodicDir(meldir)
# Directory must exist!
assert not mela.isMelodicDir('non-existent.ica')
......@@ -129,6 +129,12 @@ def test_getDataFile():
'analysis.feat/filtered_func_data.nii.gz'],
'analysis.feat/filtfunc.ica',
'analysis.feat/filtered_func_data.nii.gz'),
(['analysis.feat/filtfunc.ica/melodic_IC.nii.gz',
'analysis.feat/filtfunc.ica/melodic_mix',
'analysis.feat/filtfunc.ica/melodic_FTmix',
'analysis.feat/filtered_func_data_clean.nii.gz'],
'analysis.feat/filtfunc.ica',
'analysis.feat/filtered_func_data_clean.nii.gz'),
(['no/analysis/dirs/here/melodic_IC.nii.gz'],
'no/analysis/dirs/here/',
None),
......@@ -137,7 +143,6 @@ def test_getDataFile():
'analysis.feat/analysis.ica/melodic_FTmix'],
'analysis.feat/analysis.ica',
None),
]
for paths, meldir, expected in testcases:
......
......@@ -14,7 +14,7 @@ import nibabel as nib
import pytest
import tests
import fsl.tests as tests
import fsl.data.image as fslimage
import fsl.data.melodicimage as meli
import fsl.data.melodicanalysis as mela
......@@ -67,7 +67,7 @@ def _create_dummy_melodic_analysis(basedir,
fslimage.Image(icimg).save(icfile)
if with_data:
dataimg = np.zeros(list(shape4D[:3]) + [timepoints])
dataimg = np.zeros(list(shape4D[:3]) + [timepoints], dtype=np.int32)
for t in range(timepoints):
dataimg[..., t] = t
......@@ -82,7 +82,7 @@ def _create_dummy_melodic_analysis(basedir,
if with_meanfile:
nvoxels = np.prod(shape4D[:3])
data = np.arange(0, nvoxels).reshape(shape4D[:3])
data = np.arange(0, nvoxels).reshape(shape4D[:3]).astype(np.int32)
fslimage.Image(data).save(meanfile)
return meldir
......
......@@ -6,7 +6,6 @@
#
import collections
import six
import numpy as np
......@@ -44,7 +43,7 @@ def test_memoize():
assert timesCalled[0] == 6
# Unicode arg
s = six.u('\u25B2')
s = '\u25B2'
assert memoized(s) == s * 5
assert timesCalled[0] == 7
assert memoized(s) == s * 5
......@@ -146,7 +145,7 @@ def test_memoizeMD5():
assert timesCalled[0] == 6
# Unicode arg (and return value)
s = six.u('\u25B2')
s = '\u25B2'
assert memoized(s) == s * 5
assert timesCalled[0] == 7
assert memoized(s) == s * 5
......
......@@ -14,7 +14,7 @@ import pytest
import fsl.transform.affine as affine
import fsl.data.mesh as fslmesh
from . import tempdir
from fsl.tests import tempdir
# vertices of a cube
......@@ -27,7 +27,7 @@ CUBE_VERTICES = np.array([
[ 1, -1, 1],
[ 1, 1, -1],
[ 1, 1, 1],
])
], dtype=np.float32)
# triangles
# cw == clockwise, when facing outwards
......@@ -39,7 +39,7 @@ CUBE_TRIANGLES_CW = np.array([
[2, 6, 7], [2, 7, 3],
[0, 2, 1], [1, 2, 3],
[4, 5, 7], [4, 7, 6],
])
], dtype=np.int32)
# ccw == counter-clockwise
CUBE_TRIANGLES_CCW = np.array(CUBE_TRIANGLES_CW)
......@@ -52,9 +52,9 @@ CUBE_CCW_FACE_NORMALS = np.array([
[ 0, 1, 0], [ 0, 1, 0],
[-1, 0, 0], [-1, 0, 0],
[ 1, 0, 0], [ 1, 0, 0],
])
], dtype=np.float32)
CUBE_CCW_VERTEX_NORMALS = np.zeros((8, 3))
CUBE_CCW_VERTEX_NORMALS = np.zeros((8, 3), dtype=np.float32)
for i in range(8):
faces = np.where(CUBE_TRIANGLES_CCW == i)[0]
CUBE_CCW_VERTEX_NORMALS[i] = CUBE_CCW_FACE_NORMALS[faces].sum(axis=0)
......@@ -66,20 +66,24 @@ def test_mesh_create():
verts = np.array(CUBE_VERTICES)
tris = np.array(CUBE_TRIANGLES_CCW)
mesh = fslmesh.Mesh(tris, vertices=verts)
def test(mesh):
assert mesh.name == 'mesh'
assert mesh.dataSource is None
assert mesh.nvertices == 8
assert np.all(np.isclose(mesh.vertices, verts))
assert np.all(np.isclose(mesh.indices, tris))
print(str(mesh))
blo, bhi = mesh.bounds
assert mesh.name == 'mesh'
assert mesh.dataSource is None
assert mesh.nvertices == 8
assert np.all(np.isclose(mesh.vertices, verts))
assert np.all(np.isclose(mesh.indices, tris))
assert np.all(np.isclose(blo, verts.min(axis=0)))
assert np.all(np.isclose(bhi, verts.max(axis=0)))
blo, bhi = mesh.bounds
test(fslmesh.Mesh(tris, vertices=verts))
assert np.all(np.isclose(blo, verts.min(axis=0)))
assert np.all(np.isclose(bhi, verts.max(axis=0)))
mesh = fslmesh.Mesh()
mesh.indices = tris
mesh.addVertices(verts)
test(mesh)
def test_mesh_addVertices():
......@@ -234,6 +238,13 @@ def test_normals():
-fnormals, fslmesh.calcFaceNormals(verts, triangles_cw)))
assert np.all(np.isclose(
fnormals, fslmesh.calcFaceNormals(verts, triangles_ccw)))
# Make sure result is (1, 3) for input of (1, 3)
onetri = np.atleast_2d(triangles_ccw[0, :])
result = fslmesh.calcFaceNormals(verts, onetri)
assert result.shape == (1, 3)
assert np.all(np.isclose(fnormals[0, :], result))
assert np.all(np.isclose(
-vnormals, fslmesh.calcVertexNormals(verts, triangles_cw, -fnormals)))
assert np.all(np.isclose(
......
......@@ -19,6 +19,8 @@ def test_meta():
for k, v in data.items():
assert m.getMeta(k) == v
assert m.meta == data
assert list(data.keys()) == list(m.metaKeys())
assert list(data.values()) == list(m.metaValues())
assert list(data.items()) == list(m.metaItems())
......
......@@ -67,3 +67,9 @@ def test_MGHImage_save():
expfile = op.abspath(fslimage.addExt('example', mustExist=False))
assert img.dataSource == op.abspath(expfile)
def test_voxToSurfMat():
testfile = op.join(datadir, 'example.mgz')
img = fslmgh.MGHImage(testfile)
assert np.all(np.isclose(img.voxToSurfMat, fslmgh.voxToSurfMat(img)))
......@@ -20,20 +20,25 @@ def test_normal_usage():
default_called = []
topic_called = []
noargs_called = []
def default_callback(thing, topic, value):
default_called.append((thing, topic, value))
def topic_callback(thing, topic, value):
topic_called.append((thing, topic, value))
topic_called.append((thing, topic, value))
def noargs_callback():
noargs_called.append('called')
t.register('default_callback', default_callback)
t.register('topic_callback', topic_callback, topic='topic')
t.register('noargs_callback', noargs_callback)
with pytest.raises(notifier.Registered):
t.register('default_callback', default_callback)
with pytest.raises(notifier.Registered):
t.register('topic_callback', topic_callback, topic='topic')
t.register('topic_callback', topic_callback, topic='topic')
t.notify()
t.notify(value='value')
......@@ -45,14 +50,16 @@ def test_normal_usage():
t.deregister('default_callback')
t.deregister('topic_callback', topic='topic')
t.deregister('topic_callback', topic='topic')
t.deregister('noargs_callback')
t.notify()
t.notify(value='value')
t.notify(topic='topic')
t.notify(topic='topic', value='value')
t.notify(topic='topic', value='value')
assert len(default_called) == 4
assert len(topic_called) == 2
assert len(noargs_called) == 4
assert default_called[0] == (t, None, None)
assert default_called[1] == (t, None, 'value')
......@@ -63,24 +70,24 @@ def test_normal_usage():
def test_enable_disable():
class Thing(notifier.Notifier):
pass
t = Thing()
default_called = [0]
topic_called = [0]
def default_callback(*a):
default_called[0] += 1
def topic_callback(*a):
topic_called[0] += 1
t.register('default_callback', default_callback)
t.register('topic_callback', topic_callback, topic='topic')
t.notify()
t.notify(topic='topic')
assert default_called[0] == 2
......@@ -93,7 +100,7 @@ def test_enable_disable():
t.notify(topic='topic')
t.enable('default_callback')
assert default_called[0] == 2
assert topic_called[ 0] == 2
assert topic_called[ 0] == 2
t.disable('topic_callback', topic='topic')
assert not t.isEnabled('topic_callback', topic='topic')
......@@ -105,7 +112,7 @@ def test_enable_disable():
assert topic_called[ 0] == 2
assert t.isEnabled('topic_callback', topic='topic')
assert t.isEnabled('default_callback')
assert t.isEnabled('default_callback')
t.notify()
t.notify(topic='topic')
assert default_called[0] == 6
......@@ -117,7 +124,7 @@ def test_enable_disable():
t.notify(topic='topic')
t.enableAll()
assert default_called[0] == 6
assert topic_called[ 0] == 3
assert topic_called[ 0] == 3
t.disableAll('topic')
assert not t.isAllEnabled('topic')
......@@ -125,11 +132,11 @@ def test_enable_disable():
t.notify(topic='topic')
t.enableAll()
assert default_called[0] == 8
assert topic_called[ 0] == 3
assert topic_called[ 0] == 3
def test_skip():
class Thing(notifier.Notifier):
pass
......@@ -140,13 +147,13 @@ def test_skip():
def default_callback(*a):
default_called[0] += 1
def topic_callback(*a):
topic_called[0] += 1
t.register('default_callback', default_callback)
t.register('topic_callback', topic_callback, topic='topic')
t.notify()
t.notify(topic='topic')
assert default_called[0] == 2
......@@ -155,14 +162,14 @@ def test_skip():
with t.skip('default_callback'):
t.notify()
t.notify(topic='topic')
assert default_called[0] == 2
assert topic_called[ 0] == 2
t.notify()
t.notify(topic='topic')
assert default_called[0] == 4
assert topic_called[ 0] == 3
assert topic_called[ 0] == 3
with t.skip('topic_callback', 'topic'):
t.notify()
......@@ -173,27 +180,55 @@ def test_skip():
t.notify()
t.notify(topic='topic')
assert default_called[0] == 8
assert topic_called[ 0] == 4
assert topic_called[ 0] == 4
with t.skipAll():
t.notify()
t.notify(topic='topic')
assert default_called[0] == 8
assert topic_called[ 0] == 4
t.notify()
t.notify(topic='topic')
assert default_called[0] == 10
assert topic_called[ 0] == 5
with t.skipAll('topic'):
t.notify()
t.notify(topic='topic')
assert default_called[0] == 12
assert topic_called[ 0] == 5
t.notify()
t.notify(topic='topic')
assert default_called[0] == 14
assert topic_called[ 0] == 6
# Make sure there is no error
# if a callback function is GC'd
# fsl/fslpy!470
def test_gc():
class Thing(notifier.Notifier):
pass
t = Thing()
called = []
def callback(thing, topic, value):
called.append((thing, topic, value))
t.register('callback', callback)
t.notify()
assert called == [(t, None, None)]
called[:] = []
callback = None
del callback
t.notify()
assert called == []
......@@ -13,7 +13,7 @@ from fsl.data.gifti import GiftiMesh
from fsl.data.image import Image
from fsl.data.atlases import Atlas
from pytest import raises
from .test_image import make_image
from fsl.tests.test_image import make_image
import os
import pytest
......
......@@ -6,14 +6,15 @@
#
import os
import os.path as op
import sys
import shutil
import tempfile
import pytest
import mock
import os
import gc
import os.path as op
import sys
import time
import shutil
import tempfile
import pytest
from unittest import mock
import fsl.utils.platform as fslplatform
......@@ -27,13 +28,17 @@ def test_atts():
p.haveGui
p.canHaveGui
p.inSSHSession
p.inVNCSession
p.wxPlatform
p.wxFlavour
p.fsldir
p.fsldevdir
p.fslVersion
p.glVersion
p.glRenderer
p.glIsSoftwareRenderer
p.wsl
p.fslwsl
@pytest.mark.wxtest
......@@ -41,8 +46,22 @@ def test_haveGui():
import wx
p = fslplatform.Platform()
app = wx.App()
p = fslplatform.Platform()
# We can get weird conflicts w.r.t. access to
# the display when multiple tests are running
# simultaneously within docker on the same
# machine.
app = None
for _ in range(5):
try:
app = wx.App()
break
except Exception:
time.sleep(1)
if app is None:
assert False
frame = wx.Frame(None)
passed = [False]
frame.Show()
......@@ -67,6 +86,8 @@ def test_haveGui():
@pytest.mark.wxtest
def test_wxatts():
gc.collect()
with mock.patch.dict('sys.modules', wx=None):
p = fslplatform.Platform()
assert not p.canHaveGui
......@@ -209,3 +230,17 @@ def test_detect_ssh():
p = fslplatform.Platform()
assert not p.inSSHSession
assert not p.inVNCSession
def test_fslwsl():
"""
Note that ``Platform.fsldir`` requires the directory in ``FSLDIR`` to exist and
sets ``FSLDIR`` to ``None`` if it doesn't. So we create a ``Platform`` first
and then overwrite ``FSLDIR``. This is a bit of a hack but the logic we are testing
here is whether ``Platform.fslwsl`` recognizes a WSL ``FSLDIR`` string
"""
p = fslplatform.Platform()
with mock.patch.dict('os.environ', **{ 'FSLDIR' : '\\\\wsl$\\my cool linux distro v1.0\\usr\\local\\fsl'}):
assert p.fslwsl
with mock.patch.dict('os.environ', **{ 'FSLDIR' : '/usr/local/fsl'}):
assert not p.fslwsl
......@@ -6,25 +6,23 @@
#
import os.path as op
import os
import shutil
import textwrap
import os.path as op
import os
import shutil
import threading
import time
import textwrap as tw
# python 3
try: from unittest import mock
# python 2
except ImportError: import mock
from unittest import mock
import six
import pytest
import fsl.utils.tempdir as tempdir
from fsl.utils.platform import platform as fslplatform
import fsl.utils.run as run
import fsl.utils.fslsub as fslsub
import fsl.wrappers as wrappers
from . import make_random_image, mockFSLDIR, CaptureStdout
from fsl.tests import make_random_image, mockFSLDIR, CaptureStdout, touch
pytestmark = pytest.mark.unixtest
......@@ -36,9 +34,20 @@ def mkexec(path, contents):
os.chmod(path, 0o755)
def test_prepareArgs():
tests = [
('a b c', ['a', 'b', 'c']),
(['a', 'b', 'c'], ['a', 'b', 'c']),
('abc "woop woop"', ['abc', 'woop woop']),
(['abc', 'woop woop'], ['abc', 'woop woop']),
]
for args, expected in tests:
assert run.prepareArgs((args, )) == expected
def test_run():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
echo "standard output - arguments: $@"
......@@ -96,7 +105,7 @@ def test_run():
def test_run_tee():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
echo "standard output - arguments: $@"
......@@ -112,12 +121,30 @@ def test_run_tee():
capture = CaptureStdout()
# default behaviour is for tee=True
with capture:
stdout = run.run('./script.sh 1 2 3', log={'tee' : True})
stdout = run.run('./script.sh 1 2 3')
assert stdout == expstdout
assert capture.stdout == expstdout
with capture.reset():
stdout = run.run('./script.sh 1 2 3', log={'tee' : True})
assert stdout == expstdout
assert capture.stdout == expstdout
# disable forwarding
with capture.reset():
stdout = run.run('./script.sh 1 2 3', log={'tee' : False})
assert stdout == expstdout
assert capture.stdout == ''
# disable forwarding via silent=True
with capture.reset():
stdout = run.run('./script.sh 1 2 3', silent=True)
assert stdout == expstdout
assert capture.stdout == ''
with capture.reset():
stdout, stderr = run.run('./script.sh 1 2 3', stderr=True,
log={'tee' : True})
......@@ -149,9 +176,35 @@ def test_run_tee():
assert capture.stdout == expstdout
def test_run_passthrough():
test_script = tw.dedent("""
#!/bin/bash
echo "env: $RUN_TEST_ENV_VAR"
""").strip()
with tempdir.tempdir():
# return code == 0
mkexec('script.sh', test_script.format(0))
env = {'RUN_TEST_ENV_VAR' : 'howdy ho'}
expstdout = "env: howdy ho\n"
assert run.run('./script.sh', env=env) == expstdout
def test_cmdonly():
assert run.run('script.sh', cmdonly=True) == ['script.sh']
assert run.run('script.sh 1 2 3', cmdonly=True) == ['script.sh', '1', '2', '3']
assert run.run(['script.sh'], cmdonly=True) == ['script.sh']
assert run.run(['script.sh', '1'], cmdonly=True) == ['script.sh', '1']
def test_dryrun():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
touch foo
""").strip()
......@@ -173,7 +226,7 @@ def test_dryrun():
# test runfsl with/without $FSLDIR
def test_runfsl():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
echo {}
exit 0
......@@ -203,6 +256,10 @@ def test_runfsl():
fslplatform.fsldir = fsldir
assert run.runfsl('fslhd').strip() == 'fsldir'
# non-FSL command - should error
with pytest.raises(FileNotFoundError):
run.runfsl('ls')
# FSLDEVDIR should take precedence
fsldevdir = './fsldev'
fslhd = op.join(fsldevdir, 'bin', 'fslhd')
......@@ -231,14 +288,16 @@ def test_runfsl():
run.FSL_PREFIX = None
def mock_submit(cmd, **kwargs):
if isinstance(cmd, six.string_types):
name = cmd.split()[0]
def mock_fsl_sub(*cmd, **kwargs):
if len(cmd) == 1 and isinstance(cmd[0], str):
name = cmd[0].split()[0]
else:
name = cmd[0]
name = op.basename(name)
kwargs.pop('log', None)
jid = '12345'
output = run.run(cmd)
......@@ -249,7 +308,7 @@ def mock_submit(cmd, **kwargs):
for k in sorted(kwargs.keys()):
f.write('{}: {}\n'.format(k, kwargs[k]))
return jid
return (jid, '')
def test_run_submit():
......@@ -259,7 +318,7 @@ def test_run_submit():
f.write(contents)
os.chmod(path, 0o755)
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/usr/bin/env bash
echo test_script running
exit 0
......@@ -267,39 +326,43 @@ def test_run_submit():
with tempdir.tempdir(), \
mockFSLDIR(), \
mock.patch('fsl.utils.fslsub.submit', mock_submit):
mock.patch('fsl.wrappers.fsl_sub', mock_fsl_sub):
mkexec(op.expandvars('$FSLDIR/bin/fsltest'), test_script)
jid = run.run('fsltest', submit=True)
assert jid == '12345'
stdout, stderr = fslsub.output(jid)
stdout, stderr = run.job_output(jid)
assert stdout == 'test_script running\n'
assert stderr == ''
# or can pass submit opts as a dict
kwargs = {'name' : 'abcde', 'ram' : '4GB'}
jid = run.run('fsltest', submit=kwargs)
assert jid == '12345'
stdout, stderr = fslsub.output(jid)
stdout, stderr = run.job_output(jid)
experr = '\n'.join(['{}: {}'.format(k, kwargs[k])
for k in sorted(kwargs.keys())]) + '\n'
assert stdout == 'test_script running\n'
assert stderr == experr
# or can pass submit opts as kwargs
kwargs = {'name' : 'abcde', 'ram' : '4GB'}
jid = run.run('fsltest', submit=True, **kwargs)
assert jid == '12345'
stdout, stderr = run.job_output(jid)
experr = '\n'.join(['{}: {}'.format(k, kwargs[k])
for k in sorted(kwargs.keys())]) + '\n'
assert stdout == 'test_script running\n'
assert stderr == experr
def test_run_streams():
"""
"""
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/usr/bin/env bash
echo standard output
echo standard error >&2
......@@ -325,7 +388,6 @@ def test_run_streams():
assert open('my_stdout', 'rt').read() == expstdout
assert open('my_stderr', 'rt').read() == expstderr
capture = CaptureStdout()
with open('my_stdout', 'wt') as stdout, \
......@@ -345,9 +407,28 @@ def test_run_streams():
assert open('my_stdout', 'rt').read() == expstdout
assert open('my_stderr', 'rt').read() == expstderr
# logstdout/err can also be callables
gotstdout = []
gotstderr = []
def logstdout(stdout):
gotstdout.append(stdout)
def logstderr(stderr):
gotstderr.append(stderr)
run.run('./script.sh',
stdout=False,
stderr=False,
log={'tee' : False,
'stdout' : logstdout,
'stderr' : logstderr})
assert gotstdout == [expstdout]
assert gotstderr == [expstderr]
def test_run_logcmd():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/usr/bin/env bash
echo output $@
exit 0
......@@ -372,3 +453,96 @@ def test_run_logcmd():
assert stdout == expstdout
assert open('my_stdout', 'rt').read() == expcmd + expstdout
logged = []
def logfunc(cmd):
logged.append(cmd)
stdout = run.run('./script.sh 1 2 3', log={'cmd' : logfunc})
assert stdout == expstdout
assert logged[0] == expcmd
stdout = run.run('./script.sh 1 2 3', log={'cmd' : logfunc})
assert stdout == expstdout
assert logged[1] == expcmd
def test_hold():
with tempdir.tempdir():
holdfile = 'holdfile'
def create_holdfile():
time.sleep(3)
touch(holdfile)
with run.dryrun():
threading.Thread(target=create_holdfile).start()
run.hold([1, 2, 3], holdfile, timeout=1)
cmds = list(run.DRY_RUN_COMMANDS)
# dryrun gathers all executed commands
# in a list of (cmd, submit) tuples,
# so we do a very simple check here
assert len(cmds) == 1
cmd, submit = cmds[0]
assert cmd == ('touch', 'holdfile')
assert submit['jobhold'] == '1,2,3'
def _good_func():
print('hello')
def _bad_func():
1/0
def test_runfunc():
assert run.runfunc(_good_func, clean='always') == 'hello\n'
with pytest.raises(Exception):
assert run.runfunc(_bad_func, clean='always')
def test_func_to_cmd():
cwd = os.getcwd()
with tempdir.tempdir():
for tmp_dir in (None, '.'):
for clean in ('never', 'on_success', 'always'):
for verbose in (False, True):
cmd = run.func_to_cmd(_good_func, clean=clean, tmp_dir=tmp_dir, verbose=verbose)
fn = cmd.split()[-1]
assert op.exists(fn)
stdout, stderr, exitcode = run.run(cmd, exitcode=True, stdout=True, stderr=True,
env={"PYTHONPATH": cwd})
assert exitcode == 0
if clean == 'never':
assert op.exists(fn), "Successful job got removed, even though this was not requested"
else:
assert not op.exists(fn), f"Successful job did not get removed after run for clean = {clean}"
if verbose:
assert stdout.strip() == f'running "{fn}"\nhello'
else:
assert stdout.strip() == 'hello'
cmd = run.func_to_cmd(_bad_func, clean=clean, tmp_dir=tmp_dir)
fn = cmd.split()[-1]
assert op.exists(fn)
stdout, stderr, exitcode = run.run(cmd, exitcode=True, stdout=True, stderr=True,
env={'PYTHONPATH': cwd})
assert exitcode != 0
if clean == 'always':
assert not op.exists(fn), "Failing job should always be removed if requested"
else:
assert op.exists(fn), f"Failing job got removed even with clean = {clean}"
def test_wrapper_to_cmd():
fn = run.func_to_cmd(wrappers.bet)
assert op.exists(fn)
assert op.basename(fn).startswith("bet_")
def test_job_output():
with tempdir.tempdir() as td:
with open('test.e12345', 'wt') as f: f.write('error')
with open('test.o12345', 'wt') as f: f.write('output')
assert run.job_output(12345, td) == ('output', 'error')
......@@ -13,7 +13,7 @@ import pytest
import fsl.data.atlases as fslatlases
import fsl.scripts.atlasq as fslatlasq
from .. import CaptureStdout
from fsl.tests import CaptureStdout
pytestmark = pytest.mark.fsltest
......
......@@ -18,9 +18,9 @@ import numpy as np
import fsl.scripts.atlasq as fslatlasq
import fsl.data.atlases as fslatlases
from .. import (tempdir,
make_random_mask,
CaptureStdout)
from fsl.tests import (tempdir,
make_random_mask,
CaptureStdout)
pytestmark = pytest.mark.fsltest
......@@ -95,11 +95,7 @@ def test_coords(seed):
for ad in atlases:
# atlasquery/ohi always uses 2mm resolution
atlas = fslatlases.loadAtlas(
ad.atlasID,
resolution=2,
calcRange=False,
loadData=False)
atlas = fslatlases.loadAtlas(ad.atlasID, resolution=2)
print(ad.name)
......
......@@ -21,9 +21,9 @@ import fsl.utils.image.resample as resample
import fsl.data.image as fslimage
import fsl.scripts.atlasq as fslatlasq
from .. import (tempdir,
make_random_mask,
CaptureStdout)
from fsl.tests import (tempdir,
make_random_mask,
CaptureStdout)
pytestmark = pytest.mark.fsltest
......@@ -441,11 +441,7 @@ def test_bad_mask(seed):
for atlasID, use_label in it.product(atlases, use_labels):
atlas = fslatlases.loadAtlas(
atlasID,
loadSummary=use_label,
loadData=False,
calcRange=False)
atlas = fslatlases.loadAtlas(atlasID, loadSummary=use_label)
ashape = list(atlas.shape[:3])
wrongdims = fslimage.Image(
......
#!/usr/bin/env python
#
# test_fsl_abspath.py -
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
import os.path as op
import fsl.scripts.fsl_abspath as fsl_abspath
from fsl.tests import tempdir, CaptureStdout
def test_usage():
assert fsl_abspath.main([]) != 0
def test_fsl_abspath():
# fsl_abspath just calls os.path.realpath
with tempdir() as td:
oneup = op.dirname(td)
# (input, expected)
tests = [
('file', f'{td}/file'),
('./file', f'{td}/file'),
('../file', f'{oneup}/file'),
('/file', '/file'),
('/one/two/file', '/one/two/file'),
]
for input, expected in tests:
cap = CaptureStdout()
with cap:
ret = fsl_abspath.main([input])
assert ret == 0
assert cap.stdout.strip() == expected
......@@ -13,7 +13,7 @@ import fsl.transform.x5 as x5
import fsl.transform.affine as affine
import fsl.utils.tempdir as tempdir
from ..test_transform.test_nonlinear import _affine_field
from fsl.tests.test_transform.test_nonlinear import _affine_field
def _random_affine():
......@@ -44,7 +44,7 @@ def test_help():
def test_linear():
def test_linear(seed):
with tempdir.tempdir():
src2ref = _random_affine()
......@@ -65,7 +65,7 @@ def test_linear():
assert np.all(np.isclose(result.data, expect))
def test_nonlinear():
def test_nonlinear(seed):
with tempdir.tempdir():
src2ref = _random_affine()
......@@ -81,7 +81,7 @@ def test_nonlinear():
fsl_apply_x5.main('src xform.x5 out'.split())
result = fslimage.Image('out')
expect = resample.resampleToReference(src, ref, matrix=src2ref)[0]
expect = resample.resampleToReference(src, ref, matrix=src2ref, smooth=False)[0]
assert result.sameSpace(ref)
......@@ -89,16 +89,17 @@ def test_nonlinear():
result = result.data[1:-1, 1:-1, 1:-1]
expect = expect[ 1:-1, 1:-1, 1:-1]
assert np.all(np.isclose(result, expect))
tol = dict(atol=1e-3, rtol=1e-3)
assert np.all(np.isclose(result, expect, **tol))
def test_linear_altref():
def test_linear_altref(seed):
with tempdir.tempdir():
src2ref = affine.scaleOffsetXform([1, 1, 1], [5, 5, 5])
altv2w = affine.scaleOffsetXform([1, 1, 1], [10, 10, 10])
srcdata = np.random.randint(1, 65536, (10, 10, 10))
srcdata = np.random.randint(1, 65536, (10, 10, 10), dtype=np.int32)
src = fslimage.Image(srcdata, xform=np.eye(4))
ref = fslimage.Image(src.data, xform=src2ref)
altref = fslimage.Image(src.data, xform=altv2w)
......@@ -119,14 +120,14 @@ def test_linear_altref():
assert np.all(result.data == expect)
def test_nonlinear_altref():
def test_nonlinear_altref(seed):
with tempdir.tempdir():
src2ref = affine.scaleOffsetXform([1, 1, 1], [5, 5, 5])
ref2src = affine.invert(src2ref)
altv2w = affine.scaleOffsetXform([1, 1, 1], [10, 10, 10])
srcdata = np.random.randint(1, 65536, (10, 10, 10))
srcdata = np.random.randint(1, 65536, (10, 10, 10), dtype=np.int32)
src = fslimage.Image(srcdata, xform=np.eye(4))
ref = fslimage.Image(src.data, xform=src2ref)
altref = fslimage.Image(src.data, xform=altv2w)
......@@ -149,7 +150,7 @@ def test_nonlinear_altref():
assert np.all(result.data == expect)
def test_linear_altsrc():
def test_linear_altsrc(seed):
with tempdir.tempdir():
src2ref = _random_affine()
......@@ -198,7 +199,7 @@ def test_linear_altsrc():
assert np.all(np.isclose(outoff.data, expoff))
def test_nonlinear_altsrc():
def test_nonlinear_altsrc(seed):
with tempdir.tempdir():
src2ref = _random_affine()
......@@ -213,9 +214,13 @@ def test_nonlinear_altsrc():
src.save('src')
ref.save('ref')
srclo, xf = resample.resample(src, (10, 10, 10), origin='corner')
# use origin=corner so that the
# resampled variants are exactly
# aligned in the world coordinate
# system
srclo, xf = resample.resample(src, (10, 10, 10), origin='corner', smooth=False)
srclo = fslimage.Image(srclo, xform=xf)
srchi, xf = resample.resample(src, (40, 40, 40), origin='corner')
srchi, xf = resample.resample(src, (40, 40, 40), origin='corner', smooth=False)
srchi = fslimage.Image(srchi, xform=xf)
srcoff = roi.roi(src, [(-10, 10), (-10, 10), (-10, 10)])
......@@ -224,41 +229,40 @@ def test_nonlinear_altsrc():
srchi .save('srchi')
srcoff.save('srcoff')
# The up-sampled source image is subject to
# an unavoidable interpolation, as the
# deformation field coordinates are affine-
# transformed to the space of the alternate
# source image. So in this test case we use
# nearest-neighbour interp, so that we can
# get the same output as a standard linear
# resample
fsl_apply_x5.main('src xform.x5 out' .split())
fsl_apply_x5.main('srclo xform.x5 outlo' .split())
fsl_apply_x5.main('srchi xform.x5 outhi -i nearest'.split())
fsl_apply_x5.main('srcoff xform.x5 outoff' .split())
fsl_apply_x5.main('src xform.x5 out' .split())
fsl_apply_x5.main('srclo xform.x5 outlo' .split())
fsl_apply_x5.main('srchi xform.x5 outhi' .split())
fsl_apply_x5.main('srcoff xform.x5 outoff'.split())
out = fslimage.Image('out')
outlo = fslimage.Image('outlo')
outhi = fslimage.Image('outhi')
outoff = fslimage.Image('outoff')
exp, x1 = resample.resampleToReference(src, ref, matrix=src2ref, mode='constant')
explo, x2 = resample.resampleToReference(srclo, ref, matrix=src2ref, mode='constant')
exphi, x3 = resample.resampleToReference(srchi, ref, matrix=src2ref, mode='constant', order=0)
expoff, x4 = resample.resampleToReference(srcoff, ref, matrix=src2ref, mode='constant')
exp, x1 = resample.resampleToReference(src, ref, matrix=src2ref, mode='constant', smooth=False)
explo, x2 = resample.resampleToReference(srclo, ref, matrix=src2ref, mode='constant', smooth=False)
exphi, x3 = resample.resampleToReference(srchi, ref, matrix=src2ref, mode='constant', smooth=False)
expoff, x4 = resample.resampleToReference(srcoff, ref, matrix=src2ref, mode='constant', smooth=False)
assert out .sameSpace(ref)
assert outlo .sameSpace(ref)
assert outhi .sameSpace(ref)
assert outoff.sameSpace(ref)
# We get boundary issues just at the first
# voxel, so I'm masking that voxel out
for img in (out, outlo, outhi, outoff,
exp, explo, exphi, expoff):
img[0, 0, 0] = 0
assert np.all(np.isclose(out .data, exp))
assert np.all(np.isclose(outlo .data, explo))
assert np.all(np.isclose(outhi .data, exphi))
assert np.all(np.isclose(outoff.data, expoff))
# We get boundary cropping,
# so ignore edge slices
out = out .data[1:-1, 1:-1, 1:-1]
outlo = outlo .data[1:-1, 1:-1, 1:-1]
outhi = outhi .data[1:-1, 1:-1, 1:-1]
outoff = outoff.data[ :9, :9, :9]
exp = exp[ 1:-1, 1:-1, 1:-1]
explo = explo[ 1:-1, 1:-1, 1:-1]
exphi = exphi[ 1:-1, 1:-1, 1:-1]
expoff = expoff[ :9, :9, :9]
tol = dict(atol=1e-3, rtol=1e-3)
assert np.all(np.isclose(out, exp, **tol))
assert np.all(np.isclose(outlo, explo, **tol))
assert np.all(np.isclose(outhi, exphi, **tol))
assert np.all(np.isclose(outoff, expoff, **tol))
#!/usr/bin/env python
#
# test_imglob.py -
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
from unittest import mock
import pytest
import fsl.scripts.imglob as imglob
from . import testdir
from fsl.tests import testdir
from fsl.tests import CaptureStdout
def test_imglob_shouldPass1():
# (files to create, args, expected)
tests = [
('file.hdr file.img', 'file', 'file'),
('file.hdr file.img', '-extension file', 'file.hdr'),
('file.hdr file.img', '-extensions file', 'file.hdr file.img'),
('file.hdr file.img', 'file.hdr', 'file'),
('file.hdr file.img', ' -extension file.hdr', 'file.hdr'),
('file.hdr file.img', '-extensions file.hdr', 'file.hdr file.img'),
('file.hdr file.img', 'file.img', 'file'),
('file.hdr file.img', '-extension file.img', 'file.hdr'),
('file.hdr file.img', '-extensions file.img', 'file.hdr file.img'),
('file.hdr file.img', 'file.hdr file.img', 'file'),
('file.hdr file.img', '-extension file.hdr file.img', 'file.hdr'),
('file.hdr file.img', '-extensions file.hdr file.img', 'file.hdr file.img'),
('file.hdr file.img', 'file file.img', 'file'),
('file.hdr file.img', '-extension file file.img', 'file.hdr'),
('file.hdr file.img', '-extensions file file.img', 'file.hdr file.img'),
# no file or incomplete prefix
('file.hdr file.img', 'bag', ''),
('file.hdr file.img', '-extension bag', ''),
('file.hdr file.img', '-extensions bag', ''),
('file.hdr file.img', 'fi', ''),
('file.hdr file.img', '-extension fi', ''),
('file.hdr file.img', '-extensions fi', ''),
]
capture = CaptureStdout()
for to_create, args, expected in tests:
with testdir(to_create.split()) as td:
capture.reset()
with capture:
assert imglob.main(args.split()) == 0
assert capture.stdout.strip().split() == expected.split()
def test_imglob_shouldPass():
def test_imglob_shouldPass2():
# (files to create, paths, output, expected)
tests = [
......@@ -59,6 +100,12 @@ def test_imglob_shouldPass():
('file1.hdr file1.img file2.nii', 'file1 file2', 'primary', 'file1.hdr file2.nii'),
('file1.hdr file1.img file2.nii', 'file1 file2', 'all', 'file1.hdr file1.img file2.nii'),
# muiltiple files, given wildcard
('file1.nii file2.nii', 'file*', 'prefix', 'file1 file2'),
('file1.nii file2.nii', 'file*.nii', 'prefix', 'file1 file2'),
('file1.nii file2.nii', 'file?', 'prefix', 'file1 file2'),
('file1.nii file2.nii', 'file?.nii', 'prefix', 'file1 file2'),
# no file
('file.nii', 'bag', 'prefix', ''),
('file.nii', 'bag', 'primary', ''),
......@@ -81,7 +128,20 @@ def test_imglob_shouldPass():
assert sorted(result) == sorted(expected)
def test_imglob_shouldFail():
def test_imglob_script_shouldFail():
with pytest.raises(ValueError):
imglob.imglob([], 'bag')
capture = CaptureStdout()
with capture:
assert imglob.main([]) != 0
assert capture.stdout.strip().lower().startswith('usage:')
with capture, mock.patch('sys.argv', ['imglob']):
assert imglob.main() != 0
assert capture.stdout.strip().lower().startswith('usage:')