Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • paulmc/fslpy
  • ndcn0236/fslpy
  • seanf/fslpy
3 results
Show changes
Showing
with 795 additions and 133 deletions
......@@ -46,7 +46,7 @@ def test_roi():
]
for inshape, bounds, outshape, offset in tests:
data = np.random.randint(1, 10, inshape)
data = np.random.randint(1, 10, inshape, dtype=np.int32)
image = fslimage.Image(data, xform=np.eye(4))
result = roi.roi(image, bounds)
......@@ -84,7 +84,7 @@ def test_roi():
# - not enough bounds
# - too many bounds
# - hi >= lo
data = np.random.randint(1, 10, (10, 10, 10))
data = np.random.randint(1, 10, (10, 10, 10), dtype=np.int32)
image = fslimage.Image(data, xform=np.eye(4))
with pytest.raises(ValueError): roi.roi(image, [(0, 10), (0, 10)])
with pytest.raises(ValueError): roi.roi(image, [(0, 10), (0, 10), (0, 10), (0, 10)])
......
......@@ -7,21 +7,26 @@
from __future__ import print_function
import gzip
import itertools as it
import os.path as op
import os
import shutil
import tempfile
from unittest import mock
import os.path as op
import os
import shutil
import tempfile
import numpy as np
import nibabel as nib
import fsl.utils.imcp as imcp
import fsl.data.image as fslimage
import fsl.utils.imcp as imcp
import fsl.utils.tempdir as tempdir
import fsl.data.image as fslimage
from . import make_random_image
from . import make_dummy_file
from . import looks_like_image
from fsl.tests import (make_random_image,
make_dummy_file,
looks_like_image,
sha256)
real_print = print
......@@ -31,7 +36,7 @@ def print(*args, **kwargs):
def makeImage(filename):
return hash(make_random_image(filename).get_data().tobytes())
return hash(np.asanyarray(make_random_image(filename).dataobj).tobytes())
def checkImageHash(filename, datahash):
......@@ -39,7 +44,7 @@ def checkImageHash(filename, datahash):
"""
img = nib.load(filename)
assert hash(img.get_data().tobytes()) == datahash
assert hash(np.asanyarray(img.dataobj).tobytes()) == datahash
def checkFilesToExpect(files, outdir, outputType, datahashes):
......@@ -314,3 +319,60 @@ def test_imcp_shouldPass(move=False):
def test_immv_shouldPass():
test_imcp_shouldPass(move=True)
def test_imcp_data_unmodified():
"""Test that the data in an imcp'd image file is not modified. """
dtypes = [
np.int16,
np.int32,
np.float32,
np.float64]
slints = [(None, None), (1, 0), (3, 1.5)]
for dtype, (slope, inter) in it.product(dtypes, slints):
with tempdir.tempdir():
data = np.random.randint(1, 100, (10, 10, 10)).astype(dtype)
hdr = nib.Nifti1Header()
hdr.set_data_dtype(dtype)
hdr.set_data_shape((10, 10, 10))
hdr.set_slope_inter(slope, inter)
hdr.set_sform(np.eye(4))
# write header/data separately, as otherwise
# nibabel will automatically rescale the data
with open('image.nii', 'wb') as f:
hdr.write_to(f)
f.write(data.tobytes())
# Input/output formats the same,
# should induce a straight file copy
imcp.imcp('image.nii', 'copied.nii', useDefaultExt=False)
# uncompresed->compressed will cause imcp
# to load in the image, rather than doing a
# file copy
with mock.patch.dict(os.environ, FSLOUTPUTTYPE='NIFTI_GZ'):
imcp.imcp('image.nii', 'converted.nii.gz', useDefaultExt=True)
# copied files should be identical
assert sha256('image.nii') == sha256('copied.nii')
# Converted files should have the same
# data, slope, and intercept. Read result
# header/data separately to avoid nibabel
# auto-rescaling.
with gzip.open('converted.nii.gz', 'rb') as f:
gothdr = nib.Nifti1Header.from_fileobj(f)
databytes = f.read()
gotdata = np.frombuffer(databytes, dtype=dtype).reshape((10, 10, 10))
# Data should be binary identical
assert np.all(gotdata == data)
if slope is None: slope = 1
if inter is None: inter = 0
assert np.all(np.isclose(gothdr.get_slope_inter(), (slope, inter)))
File moved
......@@ -12,7 +12,7 @@ import itertools as it
import numpy as np
import pytest
import tests
import fsl.tests as tests
import fsl.utils.path as fslpath
import fsl.data.melodicanalysis as mela
......@@ -55,10 +55,10 @@ def test_isMelodicDir():
meldir = op.join(testdir, 'analysis.ica')
assert mela.isMelodicDir(meldir)
# Directory must end in .ica
# non-.ica prefix is ok
with tests.testdir([p.replace('.ica', '.blob') for p in paths]) as testdir:
meldir = op.join(testdir, 'analysis.blob')
assert not mela.isMelodicDir(meldir)
assert mela.isMelodicDir(meldir)
# Directory must exist!
assert not mela.isMelodicDir('non-existent.ica')
......@@ -129,6 +129,12 @@ def test_getDataFile():
'analysis.feat/filtered_func_data.nii.gz'],
'analysis.feat/filtfunc.ica',
'analysis.feat/filtered_func_data.nii.gz'),
(['analysis.feat/filtfunc.ica/melodic_IC.nii.gz',
'analysis.feat/filtfunc.ica/melodic_mix',
'analysis.feat/filtfunc.ica/melodic_FTmix',
'analysis.feat/filtered_func_data_clean.nii.gz'],
'analysis.feat/filtfunc.ica',
'analysis.feat/filtered_func_data_clean.nii.gz'),
(['no/analysis/dirs/here/melodic_IC.nii.gz'],
'no/analysis/dirs/here/',
None),
......@@ -137,7 +143,6 @@ def test_getDataFile():
'analysis.feat/analysis.ica/melodic_FTmix'],
'analysis.feat/analysis.ica',
None),
]
for paths, meldir, expected in testcases:
......
......@@ -14,7 +14,7 @@ import nibabel as nib
import pytest
import tests
import fsl.tests as tests
import fsl.data.image as fslimage
import fsl.data.melodicimage as meli
import fsl.data.melodicanalysis as mela
......@@ -67,7 +67,7 @@ def _create_dummy_melodic_analysis(basedir,
fslimage.Image(icimg).save(icfile)
if with_data:
dataimg = np.zeros(list(shape4D[:3]) + [timepoints])
dataimg = np.zeros(list(shape4D[:3]) + [timepoints], dtype=np.int32)
for t in range(timepoints):
dataimg[..., t] = t
......@@ -82,7 +82,7 @@ def _create_dummy_melodic_analysis(basedir,
if with_meanfile:
nvoxels = np.prod(shape4D[:3])
data = np.arange(0, nvoxels).reshape(shape4D[:3])
data = np.arange(0, nvoxels).reshape(shape4D[:3]).astype(np.int32)
fslimage.Image(data).save(meanfile)
return meldir
......
......@@ -6,7 +6,6 @@
#
import collections
import six
import numpy as np
......@@ -44,7 +43,7 @@ def test_memoize():
assert timesCalled[0] == 6
# Unicode arg
s = six.u('\u25B2')
s = '\u25B2'
assert memoized(s) == s * 5
assert timesCalled[0] == 7
assert memoized(s) == s * 5
......@@ -146,7 +145,7 @@ def test_memoizeMD5():
assert timesCalled[0] == 6
# Unicode arg (and return value)
s = six.u('\u25B2')
s = '\u25B2'
assert memoized(s) == s * 5
assert timesCalled[0] == 7
assert memoized(s) == s * 5
......
......@@ -14,7 +14,7 @@ import pytest
import fsl.transform.affine as affine
import fsl.data.mesh as fslmesh
from . import tempdir
from fsl.tests import tempdir
# vertices of a cube
......@@ -27,7 +27,7 @@ CUBE_VERTICES = np.array([
[ 1, -1, 1],
[ 1, 1, -1],
[ 1, 1, 1],
])
], dtype=np.float32)
# triangles
# cw == clockwise, when facing outwards
......@@ -39,7 +39,7 @@ CUBE_TRIANGLES_CW = np.array([
[2, 6, 7], [2, 7, 3],
[0, 2, 1], [1, 2, 3],
[4, 5, 7], [4, 7, 6],
])
], dtype=np.int32)
# ccw == counter-clockwise
CUBE_TRIANGLES_CCW = np.array(CUBE_TRIANGLES_CW)
......@@ -52,9 +52,9 @@ CUBE_CCW_FACE_NORMALS = np.array([
[ 0, 1, 0], [ 0, 1, 0],
[-1, 0, 0], [-1, 0, 0],
[ 1, 0, 0], [ 1, 0, 0],
])
], dtype=np.float32)
CUBE_CCW_VERTEX_NORMALS = np.zeros((8, 3))
CUBE_CCW_VERTEX_NORMALS = np.zeros((8, 3), dtype=np.float32)
for i in range(8):
faces = np.where(CUBE_TRIANGLES_CCW == i)[0]
CUBE_CCW_VERTEX_NORMALS[i] = CUBE_CCW_FACE_NORMALS[faces].sum(axis=0)
......@@ -66,20 +66,24 @@ def test_mesh_create():
verts = np.array(CUBE_VERTICES)
tris = np.array(CUBE_TRIANGLES_CCW)
mesh = fslmesh.Mesh(tris, vertices=verts)
def test(mesh):
assert mesh.name == 'mesh'
assert mesh.dataSource is None
assert mesh.nvertices == 8
assert np.all(np.isclose(mesh.vertices, verts))
assert np.all(np.isclose(mesh.indices, tris))
print(str(mesh))
blo, bhi = mesh.bounds
assert mesh.name == 'mesh'
assert mesh.dataSource is None
assert mesh.nvertices == 8
assert np.all(np.isclose(mesh.vertices, verts))
assert np.all(np.isclose(mesh.indices, tris))
assert np.all(np.isclose(blo, verts.min(axis=0)))
assert np.all(np.isclose(bhi, verts.max(axis=0)))
blo, bhi = mesh.bounds
test(fslmesh.Mesh(tris, vertices=verts))
assert np.all(np.isclose(blo, verts.min(axis=0)))
assert np.all(np.isclose(bhi, verts.max(axis=0)))
mesh = fslmesh.Mesh()
mesh.indices = tris
mesh.addVertices(verts)
test(mesh)
def test_mesh_addVertices():
......@@ -234,6 +238,13 @@ def test_normals():
-fnormals, fslmesh.calcFaceNormals(verts, triangles_cw)))
assert np.all(np.isclose(
fnormals, fslmesh.calcFaceNormals(verts, triangles_ccw)))
# Make sure result is (1, 3) for input of (1, 3)
onetri = np.atleast_2d(triangles_ccw[0, :])
result = fslmesh.calcFaceNormals(verts, onetri)
assert result.shape == (1, 3)
assert np.all(np.isclose(fnormals[0, :], result))
assert np.all(np.isclose(
-vnormals, fslmesh.calcVertexNormals(verts, triangles_cw, -fnormals)))
assert np.all(np.isclose(
......@@ -245,7 +256,7 @@ def test_needsFixing():
verts = np.array(CUBE_VERTICES)
tris_cw = np.array(CUBE_TRIANGLES_CW)
tris_ccw = np.array(CUBE_TRIANGLES_CCW)
fnormals = np.array(CUBE_CCW_VERTEX_NORMALS)
fnormals = np.array(CUBE_CCW_FACE_NORMALS)
blo = verts.min(axis=0)
bhi = verts.max(axis=0)
mesh = fslmesh.Mesh(tris_cw, vertices=verts, fixWinding=True)
......@@ -254,6 +265,28 @@ def test_needsFixing():
assert fslmesh.needsFixing(verts, tris_cw, -fnormals, blo, bhi)
assert np.all(np.isclose(mesh.indices, tris_ccw))
# regression: needsFixing used to use the first triangle
# of the nearest vertex to the camera. But this will fail
# if that triangle is facing away from the camera.
verts = np.array([
[ -1, -1, -1], # vertex 0 will be nearest the camera
[ 0.5, -0.5, 0],
[ 1, -1, 0],
[ 1, 1, 1],
[ 0, -1, 1]])
tris = np.array([
[0, 4, 1], # first triangle will be facing away from the camera
[0, 1, 2],
[1, 3, 2],
[0, 2, 4],
[2, 3, 4],
[1, 4, 3]])
mesh = fslmesh.Mesh(tris, vertices=verts, fixWinding=True)
fnormals = fslmesh.calcFaceNormals(verts, tris)
blo = verts.min(axis=0)
bhi = verts.max(axis=0)
assert not fslmesh.needsFixing(verts, tris, fnormals, blo, bhi)
def test_trimesh_no_trimesh():
......
......@@ -19,6 +19,13 @@ def test_meta():
for k, v in data.items():
assert m.getMeta(k) == v
assert m.meta == data
assert list(data.keys()) == list(m.metaKeys())
assert list(data.values()) == list(m.metaValues())
assert list(data.items()) == list(m.metaItems())
data.update( {'d' : 4, 'e' : 5})
m.updateMeta({'d' : 4, 'e' : 5})
assert list(data.items()) == list(m.metaItems())
......@@ -31,7 +31,7 @@ def test_MGHImage():
v2s = nbimg.header.get_vox2ras_tkr()
w2s = affine.concat(v2s, affine.invert(nbimg.affine))
assert np.all(np.isclose(img[:], nbimg.get_data()))
assert np.all(np.isclose(img[:], np.asanyarray(nbimg.dataobj)))
assert np.all(np.isclose(img.voxToWorldMat, nbimg.affine))
assert np.all(np.isclose(img.voxToSurfMat, v2s))
assert np.all(np.isclose(img.surfToVoxMat, affine.invert(v2s)))
......@@ -44,7 +44,7 @@ def test_MGHImage():
# Load from an in-memory nibabel object
img = fslmgh.MGHImage(nbimg)
assert np.all(np.isclose(img[:], nbimg.get_data()))
assert np.all(np.isclose(img[:], np.asanyarray(nbimg.dataobj)))
assert np.all(np.isclose(img.voxToWorldMat, nbimg.affine))
assert img.dataSource is None
assert img.mghImageFile is None
......@@ -67,3 +67,9 @@ def test_MGHImage_save():
expfile = op.abspath(fslimage.addExt('example', mustExist=False))
assert img.dataSource == op.abspath(expfile)
def test_voxToSurfMat():
testfile = op.join(datadir, 'example.mgz')
img = fslmgh.MGHImage(testfile)
assert np.all(np.isclose(img.voxToSurfMat, fslmgh.voxToSurfMat(img)))
......@@ -20,20 +20,25 @@ def test_normal_usage():
default_called = []
topic_called = []
noargs_called = []
def default_callback(thing, topic, value):
default_called.append((thing, topic, value))
def topic_callback(thing, topic, value):
topic_called.append((thing, topic, value))
topic_called.append((thing, topic, value))
def noargs_callback():
noargs_called.append('called')
t.register('default_callback', default_callback)
t.register('topic_callback', topic_callback, topic='topic')
t.register('noargs_callback', noargs_callback)
with pytest.raises(notifier.Registered):
t.register('default_callback', default_callback)
with pytest.raises(notifier.Registered):
t.register('topic_callback', topic_callback, topic='topic')
t.register('topic_callback', topic_callback, topic='topic')
t.notify()
t.notify(value='value')
......@@ -45,14 +50,16 @@ def test_normal_usage():
t.deregister('default_callback')
t.deregister('topic_callback', topic='topic')
t.deregister('topic_callback', topic='topic')
t.deregister('noargs_callback')
t.notify()
t.notify(value='value')
t.notify(topic='topic')
t.notify(topic='topic', value='value')
t.notify(topic='topic', value='value')
assert len(default_called) == 4
assert len(topic_called) == 2
assert len(noargs_called) == 4
assert default_called[0] == (t, None, None)
assert default_called[1] == (t, None, 'value')
......@@ -63,24 +70,24 @@ def test_normal_usage():
def test_enable_disable():
class Thing(notifier.Notifier):
pass
t = Thing()
default_called = [0]
topic_called = [0]
def default_callback(*a):
default_called[0] += 1
def topic_callback(*a):
topic_called[0] += 1
t.register('default_callback', default_callback)
t.register('topic_callback', topic_callback, topic='topic')
t.notify()
t.notify(topic='topic')
assert default_called[0] == 2
......@@ -93,7 +100,7 @@ def test_enable_disable():
t.notify(topic='topic')
t.enable('default_callback')
assert default_called[0] == 2
assert topic_called[ 0] == 2
assert topic_called[ 0] == 2
t.disable('topic_callback', topic='topic')
assert not t.isEnabled('topic_callback', topic='topic')
......@@ -105,7 +112,7 @@ def test_enable_disable():
assert topic_called[ 0] == 2
assert t.isEnabled('topic_callback', topic='topic')
assert t.isEnabled('default_callback')
assert t.isEnabled('default_callback')
t.notify()
t.notify(topic='topic')
assert default_called[0] == 6
......@@ -117,7 +124,7 @@ def test_enable_disable():
t.notify(topic='topic')
t.enableAll()
assert default_called[0] == 6
assert topic_called[ 0] == 3
assert topic_called[ 0] == 3
t.disableAll('topic')
assert not t.isAllEnabled('topic')
......@@ -125,11 +132,11 @@ def test_enable_disable():
t.notify(topic='topic')
t.enableAll()
assert default_called[0] == 8
assert topic_called[ 0] == 3
assert topic_called[ 0] == 3
def test_skip():
class Thing(notifier.Notifier):
pass
......@@ -140,13 +147,13 @@ def test_skip():
def default_callback(*a):
default_called[0] += 1
def topic_callback(*a):
topic_called[0] += 1
t.register('default_callback', default_callback)
t.register('topic_callback', topic_callback, topic='topic')
t.notify()
t.notify(topic='topic')
assert default_called[0] == 2
......@@ -155,14 +162,14 @@ def test_skip():
with t.skip('default_callback'):
t.notify()
t.notify(topic='topic')
assert default_called[0] == 2
assert topic_called[ 0] == 2
t.notify()
t.notify(topic='topic')
assert default_called[0] == 4
assert topic_called[ 0] == 3
assert topic_called[ 0] == 3
with t.skip('topic_callback', 'topic'):
t.notify()
......@@ -173,27 +180,55 @@ def test_skip():
t.notify()
t.notify(topic='topic')
assert default_called[0] == 8
assert topic_called[ 0] == 4
assert topic_called[ 0] == 4
with t.skipAll():
t.notify()
t.notify(topic='topic')
assert default_called[0] == 8
assert topic_called[ 0] == 4
t.notify()
t.notify(topic='topic')
assert default_called[0] == 10
assert topic_called[ 0] == 5
with t.skipAll('topic'):
t.notify()
t.notify(topic='topic')
assert default_called[0] == 12
assert topic_called[ 0] == 5
t.notify()
t.notify(topic='topic')
assert default_called[0] == 14
assert topic_called[ 0] == 6
# Make sure there is no error
# if a callback function is GC'd
# fsl/fslpy!470
def test_gc():
class Thing(notifier.Notifier):
pass
t = Thing()
called = []
def callback(thing, topic, value):
called.append((thing, topic, value))
t.register('callback', callback)
t.notify()
assert called == [(t, None, None)]
called[:] = []
callback = None
del callback
t.notify()
assert called == []
......@@ -13,7 +13,7 @@ from fsl.data.gifti import GiftiMesh
from fsl.data.image import Image
from fsl.data.atlases import Atlas
from pytest import raises
from .test_image import make_image
from fsl.tests.test_image import make_image
import os
import pytest
......
......@@ -6,14 +6,15 @@
#
import os
import os.path as op
import sys
import shutil
import tempfile
import pytest
import mock
import os
import gc
import os.path as op
import sys
import time
import shutil
import tempfile
import pytest
from unittest import mock
import fsl.utils.platform as fslplatform
......@@ -27,13 +28,17 @@ def test_atts():
p.haveGui
p.canHaveGui
p.inSSHSession
p.inVNCSession
p.wxPlatform
p.wxFlavour
p.fsldir
p.fsldevdir
p.fslVersion
p.glVersion
p.glRenderer
p.glIsSoftwareRenderer
p.wsl
p.fslwsl
@pytest.mark.wxtest
......@@ -41,8 +46,22 @@ def test_haveGui():
import wx
p = fslplatform.Platform()
app = wx.App()
p = fslplatform.Platform()
# We can get weird conflicts w.r.t. access to
# the display when multiple tests are running
# simultaneously within docker on the same
# machine.
app = None
for _ in range(5):
try:
app = wx.App()
break
except Exception:
time.sleep(1)
if app is None:
assert False
frame = wx.Frame(None)
passed = [False]
frame.Show()
......@@ -59,6 +78,7 @@ def test_haveGui():
wx.CallLater(500, runtest)
app.MainLoop()
del app
assert passed[0]
......@@ -66,6 +86,8 @@ def test_haveGui():
@pytest.mark.wxtest
def test_wxatts():
gc.collect()
with mock.patch.dict('sys.modules', wx=None):
p = fslplatform.Platform()
assert not p.canHaveGui
......@@ -208,3 +230,17 @@ def test_detect_ssh():
p = fslplatform.Platform()
assert not p.inSSHSession
assert not p.inVNCSession
def test_fslwsl():
"""
Note that ``Platform.fsldir`` requires the directory in ``FSLDIR`` to exist and
sets ``FSLDIR`` to ``None`` if it doesn't. So we create a ``Platform`` first
and then overwrite ``FSLDIR``. This is a bit of a hack but the logic we are testing
here is whether ``Platform.fslwsl`` recognizes a WSL ``FSLDIR`` string
"""
p = fslplatform.Platform()
with mock.patch.dict('os.environ', **{ 'FSLDIR' : '\\\\wsl$\\my cool linux distro v1.0\\usr\\local\\fsl'}):
assert p.fslwsl
with mock.patch.dict('os.environ', **{ 'FSLDIR' : '/usr/local/fsl'}):
assert not p.fslwsl
......@@ -6,25 +6,23 @@
#
import os.path as op
import os
import shutil
import textwrap
import os.path as op
import os
import shutil
import threading
import time
import textwrap as tw
# python 3
try: from unittest import mock
# python 2
except ImportError: import mock
from unittest import mock
import six
import pytest
import fsl.utils.tempdir as tempdir
from fsl.utils.platform import platform as fslplatform
import fsl.utils.run as run
import fsl.utils.fslsub as fslsub
import fsl.wrappers as wrappers
from . import make_random_image, mockFSLDIR, CaptureStdout
from fsl.tests import make_random_image, mockFSLDIR, CaptureStdout, touch
pytestmark = pytest.mark.unixtest
......@@ -36,9 +34,20 @@ def mkexec(path, contents):
os.chmod(path, 0o755)
def test_prepareArgs():
tests = [
('a b c', ['a', 'b', 'c']),
(['a', 'b', 'c'], ['a', 'b', 'c']),
('abc "woop woop"', ['abc', 'woop woop']),
(['abc', 'woop woop'], ['abc', 'woop woop']),
]
for args, expected in tests:
assert run.prepareArgs((args, )) == expected
def test_run():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
echo "standard output - arguments: $@"
......@@ -96,7 +105,7 @@ def test_run():
def test_run_tee():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
echo "standard output - arguments: $@"
......@@ -112,12 +121,30 @@ def test_run_tee():
capture = CaptureStdout()
# default behaviour is for tee=True
with capture:
stdout = run.run('./script.sh 1 2 3', log={'tee' : True})
stdout = run.run('./script.sh 1 2 3')
assert stdout == expstdout
assert capture.stdout == expstdout
with capture.reset():
stdout = run.run('./script.sh 1 2 3', log={'tee' : True})
assert stdout == expstdout
assert capture.stdout == expstdout
# disable forwarding
with capture.reset():
stdout = run.run('./script.sh 1 2 3', log={'tee' : False})
assert stdout == expstdout
assert capture.stdout == ''
# disable forwarding via silent=True
with capture.reset():
stdout = run.run('./script.sh 1 2 3', silent=True)
assert stdout == expstdout
assert capture.stdout == ''
with capture.reset():
stdout, stderr = run.run('./script.sh 1 2 3', stderr=True,
log={'tee' : True})
......@@ -149,9 +176,35 @@ def test_run_tee():
assert capture.stdout == expstdout
def test_run_passthrough():
test_script = tw.dedent("""
#!/bin/bash
echo "env: $RUN_TEST_ENV_VAR"
""").strip()
with tempdir.tempdir():
# return code == 0
mkexec('script.sh', test_script.format(0))
env = {'RUN_TEST_ENV_VAR' : 'howdy ho'}
expstdout = "env: howdy ho\n"
assert run.run('./script.sh', env=env) == expstdout
def test_cmdonly():
assert run.run('script.sh', cmdonly=True) == ['script.sh']
assert run.run('script.sh 1 2 3', cmdonly=True) == ['script.sh', '1', '2', '3']
assert run.run(['script.sh'], cmdonly=True) == ['script.sh']
assert run.run(['script.sh', '1'], cmdonly=True) == ['script.sh', '1']
def test_dryrun():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
touch foo
""").strip()
......@@ -173,7 +226,7 @@ def test_dryrun():
# test runfsl with/without $FSLDIR
def test_runfsl():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/bin/bash
echo {}
exit 0
......@@ -203,6 +256,10 @@ def test_runfsl():
fslplatform.fsldir = fsldir
assert run.runfsl('fslhd').strip() == 'fsldir'
# non-FSL command - should error
with pytest.raises(FileNotFoundError):
run.runfsl('ls')
# FSLDEVDIR should take precedence
fsldevdir = './fsldev'
fslhd = op.join(fsldevdir, 'bin', 'fslhd')
......@@ -231,14 +288,16 @@ def test_runfsl():
run.FSL_PREFIX = None
def mock_submit(cmd, **kwargs):
if isinstance(cmd, six.string_types):
name = cmd.split()[0]
def mock_fsl_sub(*cmd, **kwargs):
if len(cmd) == 1 and isinstance(cmd[0], str):
name = cmd[0].split()[0]
else:
name = cmd[0]
name = op.basename(name)
kwargs.pop('log', None)
jid = '12345'
output = run.run(cmd)
......@@ -249,7 +308,7 @@ def mock_submit(cmd, **kwargs):
for k in sorted(kwargs.keys()):
f.write('{}: {}\n'.format(k, kwargs[k]))
return jid
return (jid, '')
def test_run_submit():
......@@ -259,7 +318,7 @@ def test_run_submit():
f.write(contents)
os.chmod(path, 0o755)
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/usr/bin/env bash
echo test_script running
exit 0
......@@ -267,39 +326,43 @@ def test_run_submit():
with tempdir.tempdir(), \
mockFSLDIR(), \
mock.patch('fsl.utils.fslsub.submit', mock_submit):
mock.patch('fsl.wrappers.fsl_sub', mock_fsl_sub):
mkexec(op.expandvars('$FSLDIR/bin/fsltest'), test_script)
jid = run.run('fsltest', submit=True)
assert jid == '12345'
stdout, stderr = fslsub.output(jid)
stdout, stderr = run.job_output(jid)
assert stdout == 'test_script running\n'
assert stderr == ''
# or can pass submit opts as a dict
kwargs = {'name' : 'abcde', 'ram' : '4GB'}
jid = run.run('fsltest', submit=kwargs)
assert jid == '12345'
stdout, stderr = fslsub.output(jid)
stdout, stderr = run.job_output(jid)
experr = '\n'.join(['{}: {}'.format(k, kwargs[k])
for k in sorted(kwargs.keys())]) + '\n'
assert stdout == 'test_script running\n'
assert stderr == experr
# or can pass submit opts as kwargs
kwargs = {'name' : 'abcde', 'ram' : '4GB'}
jid = run.run('fsltest', submit=True, **kwargs)
assert jid == '12345'
stdout, stderr = run.job_output(jid)
experr = '\n'.join(['{}: {}'.format(k, kwargs[k])
for k in sorted(kwargs.keys())]) + '\n'
assert stdout == 'test_script running\n'
assert stderr == experr
def test_run_streams():
"""
"""
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/usr/bin/env bash
echo standard output
echo standard error >&2
......@@ -325,7 +388,6 @@ def test_run_streams():
assert open('my_stdout', 'rt').read() == expstdout
assert open('my_stderr', 'rt').read() == expstderr
capture = CaptureStdout()
with open('my_stdout', 'wt') as stdout, \
......@@ -345,9 +407,28 @@ def test_run_streams():
assert open('my_stdout', 'rt').read() == expstdout
assert open('my_stderr', 'rt').read() == expstderr
# logstdout/err can also be callables
gotstdout = []
gotstderr = []
def logstdout(stdout):
gotstdout.append(stdout)
def logstderr(stderr):
gotstderr.append(stderr)
run.run('./script.sh',
stdout=False,
stderr=False,
log={'tee' : False,
'stdout' : logstdout,
'stderr' : logstderr})
assert gotstdout == [expstdout]
assert gotstderr == [expstderr]
def test_run_logcmd():
test_script = textwrap.dedent("""
test_script = tw.dedent("""
#!/usr/bin/env bash
echo output $@
exit 0
......@@ -372,3 +453,96 @@ def test_run_logcmd():
assert stdout == expstdout
assert open('my_stdout', 'rt').read() == expcmd + expstdout
logged = []
def logfunc(cmd):
logged.append(cmd)
stdout = run.run('./script.sh 1 2 3', log={'cmd' : logfunc})
assert stdout == expstdout
assert logged[0] == expcmd
stdout = run.run('./script.sh 1 2 3', log={'cmd' : logfunc})
assert stdout == expstdout
assert logged[1] == expcmd
def test_hold():
with tempdir.tempdir():
holdfile = 'holdfile'
def create_holdfile():
time.sleep(3)
touch(holdfile)
with run.dryrun():
threading.Thread(target=create_holdfile).start()
run.hold([1, 2, 3], holdfile, timeout=1)
cmds = list(run.DRY_RUN_COMMANDS)
# dryrun gathers all executed commands
# in a list of (cmd, submit) tuples,
# so we do a very simple check here
assert len(cmds) == 1
cmd, submit = cmds[0]
assert cmd == ('touch', 'holdfile')
assert submit['jobhold'] == '1,2,3'
def _good_func():
print('hello')
def _bad_func():
1/0
def test_runfunc():
assert run.runfunc(_good_func, clean='always') == 'hello\n'
with pytest.raises(Exception):
assert run.runfunc(_bad_func, clean='always')
def test_func_to_cmd():
cwd = os.getcwd()
with tempdir.tempdir():
for tmp_dir in (None, '.'):
for clean in ('never', 'on_success', 'always'):
for verbose in (False, True):
cmd = run.func_to_cmd(_good_func, clean=clean, tmp_dir=tmp_dir, verbose=verbose)
fn = cmd.split()[-1]
assert op.exists(fn)
stdout, stderr, exitcode = run.run(cmd, exitcode=True, stdout=True, stderr=True,
env={"PYTHONPATH": cwd})
assert exitcode == 0
if clean == 'never':
assert op.exists(fn), "Successful job got removed, even though this was not requested"
else:
assert not op.exists(fn), f"Successful job did not get removed after run for clean = {clean}"
if verbose:
assert stdout.strip() == f'running "{fn}"\nhello'
else:
assert stdout.strip() == 'hello'
cmd = run.func_to_cmd(_bad_func, clean=clean, tmp_dir=tmp_dir)
fn = cmd.split()[-1]
assert op.exists(fn)
stdout, stderr, exitcode = run.run(cmd, exitcode=True, stdout=True, stderr=True,
env={'PYTHONPATH': cwd})
assert exitcode != 0
if clean == 'always':
assert not op.exists(fn), "Failing job should always be removed if requested"
else:
assert op.exists(fn), f"Failing job got removed even with clean = {clean}"
def test_wrapper_to_cmd():
fn = run.func_to_cmd(wrappers.bet)
assert op.exists(fn)
assert op.basename(fn).startswith("bet_")
def test_job_output():
with tempdir.tempdir() as td:
with open('test.e12345', 'wt') as f: f.write('error')
with open('test.o12345', 'wt') as f: f.write('output')
assert run.job_output(12345, td) == ('output', 'error')
......@@ -13,7 +13,7 @@ import pytest
import fsl.data.atlases as fslatlases
import fsl.scripts.atlasq as fslatlasq
from .. import CaptureStdout
from fsl.tests import CaptureStdout
pytestmark = pytest.mark.fsltest
......
......@@ -18,9 +18,9 @@ import numpy as np
import fsl.scripts.atlasq as fslatlasq
import fsl.data.atlases as fslatlases
from .. import (tempdir,
make_random_mask,
CaptureStdout)
from fsl.tests import (tempdir,
make_random_mask,
CaptureStdout)
pytestmark = pytest.mark.fsltest
......@@ -51,7 +51,7 @@ def test_coords(seed):
"""Test the ohi -a "atlas" -c "coords" mode. """
def expectedProbOutput(atlas, coords):
probs = atlas.proportions(coords)
probs = atlas.values(coords)
expected = '<b>{}</b><br>'.format(atlas.desc.name)
nzprobs = []
......@@ -95,11 +95,7 @@ def test_coords(seed):
for ad in atlases:
# atlasquery/ohi always uses 2mm resolution
atlas = fslatlases.loadAtlas(
ad.atlasID,
resolution=2,
calcRange=False,
loadData=False)
atlas = fslatlases.loadAtlas(ad.atlasID, resolution=2)
print(ad.name)
......@@ -161,7 +157,7 @@ def test_mask(seed):
def expectedProbOutput(mask, atlas):
props = atlas.maskProportions(mask)
props = atlas.maskValues(mask)
labels = [l.index for l in atlas.desc.labels]
exp = []
......
......@@ -21,9 +21,9 @@ import fsl.utils.image.resample as resample
import fsl.data.image as fslimage
import fsl.scripts.atlasq as fslatlasq
from .. import (tempdir,
make_random_mask,
CaptureStdout)
from fsl.tests import (tempdir,
make_random_mask,
CaptureStdout)
pytestmark = pytest.mark.fsltest
......@@ -286,7 +286,7 @@ def _eval_coord_voxel_query(
else:
exp = [q_type, squery]
_stdout = re.sub('\s+', ' ', stdout).strip()
_stdout = re.sub(r'\s+', ' ', stdout).strip()
assert _stdout == ' '.join(exp)
if isinstance(a_img, fslatlases.LabelAtlas):
......@@ -294,7 +294,7 @@ def _eval_coord_voxel_query(
if o_type == 'normal': evalLabelNormalOutput(explabel)
else: evalLabelShortOutput(explabel)
elif isinstance(a_img, fslatlases.ProbabilisticAtlas):
expprops = a_img.proportions(query, voxel=voxel)
expprops = a_img.values(query, voxel=voxel)
if o_type == 'normal': evalProbNormalOutput(expprops)
else: evalProbShortOutput(expprops)
......@@ -388,7 +388,7 @@ def _eval_mask_query(
if expprop > 0:
exp.append('{} {:0.4f}'.format(explabel, expprop))
_stdout = re.sub('\s+', ' ', stdout).strip()
_stdout = re.sub(r'\s+', ' ', stdout).strip()
assert _stdout == ' '.join(exp)
if isinstance(aimg, fslatlases.LabelAtlas):
......@@ -400,7 +400,7 @@ def _eval_mask_query(
explabels, expprops = [], []
elif isinstance(aimg, fslatlases.ProbabilisticAtlas):
try:
expprops = aimg.maskProportions(maskimg)
expprops = aimg.maskValues(maskimg)
explabels = aimg.desc.labels
except fslatlases.MaskError:
explabels = []
......@@ -441,11 +441,7 @@ def test_bad_mask(seed):
for atlasID, use_label in it.product(atlases, use_labels):
atlas = fslatlases.loadAtlas(
atlasID,
loadSummary=use_label,
loadData=False,
calcRange=False)
atlas = fslatlases.loadAtlas(atlasID, loadSummary=use_label)
ashape = list(atlas.shape[:3])
wrongdims = fslimage.Image(
......
#!/usr/bin/env python
#
# test_fsl_abspath.py -
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
import os.path as op
import fsl.scripts.fsl_abspath as fsl_abspath
from fsl.tests import tempdir, CaptureStdout
def test_usage():
assert fsl_abspath.main([]) != 0
def test_fsl_abspath():
# fsl_abspath just calls os.path.realpath
with tempdir() as td:
oneup = op.dirname(td)
# (input, expected)
tests = [
('file', f'{td}/file'),
('./file', f'{td}/file'),
('../file', f'{oneup}/file'),
('/file', '/file'),
('/one/two/file', '/one/two/file'),
]
for input, expected in tests:
cap = CaptureStdout()
with cap:
ret = fsl_abspath.main([input])
assert ret == 0
assert cap.stdout.strip() == expected
#!/usr/bin/env python
import numpy as np
import pytest
import fsl.scripts.fsl_apply_x5 as fsl_apply_x5
import fsl.data.image as fslimage
import fsl.utils.image.resample as resample
import fsl.utils.image.roi as roi
import fsl.transform.x5 as x5
import fsl.transform.affine as affine
import fsl.utils.tempdir as tempdir
from fsl.tests.test_transform.test_nonlinear import _affine_field
def _random_affine():
return affine.compose(
np.random.randint(2, 5, 3),
np.random.randint(1, 10, 3),
np.random.random(3))
def _random_image(aff, shape=None):
if shape is None:
shape = np.random.randint(10, 50, 3)
data = (np.random.random(shape) - 0.5) * 10
return fslimage.Image(data, xform=aff)
def test_help():
def run(args):
with pytest.raises(SystemExit) as e:
fsl_apply_x5.main(args)
assert e.value.code == 0
run([])
run(['-h'])
def test_linear(seed):
with tempdir.tempdir():
src2ref = _random_affine()
src = _random_image(np.eye(4))
ref = _random_image(src2ref)
x5.writeLinearX5('xform.x5', src2ref, src, ref)
src.save('src')
ref.save('ref')
fsl_apply_x5.main('src xform.x5 out'.split())
result = fslimage.Image('out')
expect = resample.resampleToReference(src, ref, matrix=src2ref)[0]
assert result.sameSpace(ref)
assert np.all(np.isclose(result.data, expect))
def test_nonlinear(seed):
with tempdir.tempdir():
src2ref = _random_affine()
ref2src = affine.invert(src2ref)
src = _random_image(np.eye(4))
ref = _random_image(src2ref)
field = _affine_field(src, ref, ref2src, 'world', 'world')
x5.writeNonLinearX5('xform.x5', field)
src.save('src')
fsl_apply_x5.main('src xform.x5 out'.split())
result = fslimage.Image('out')
expect = resample.resampleToReference(src, ref, matrix=src2ref, smooth=False)[0]
assert result.sameSpace(ref)
# We might get truncation on the edges
result = result.data[1:-1, 1:-1, 1:-1]
expect = expect[ 1:-1, 1:-1, 1:-1]
tol = dict(atol=1e-3, rtol=1e-3)
assert np.all(np.isclose(result, expect, **tol))
def test_linear_altref(seed):
with tempdir.tempdir():
src2ref = affine.scaleOffsetXform([1, 1, 1], [5, 5, 5])
altv2w = affine.scaleOffsetXform([1, 1, 1], [10, 10, 10])
srcdata = np.random.randint(1, 65536, (10, 10, 10), dtype=np.int32)
src = fslimage.Image(srcdata, xform=np.eye(4))
ref = fslimage.Image(src.data, xform=src2ref)
altref = fslimage.Image(src.data, xform=altv2w)
src .save('src')
ref .save('ref')
altref.save('altref')
x5.writeLinearX5('xform.x5', src2ref, src, ref)
fsl_apply_x5.main('src xform.x5 out -r altref'.split())
result = fslimage.Image('out')
expect = np.zeros(srcdata.shape)
expect[:5, :5, :5] = srcdata[5:, 5:, 5:]
assert result.sameSpace(altref)
assert np.all(result.data == expect)
def test_nonlinear_altref(seed):
with tempdir.tempdir():
src2ref = affine.scaleOffsetXform([1, 1, 1], [5, 5, 5])
ref2src = affine.invert(src2ref)
altv2w = affine.scaleOffsetXform([1, 1, 1], [10, 10, 10])
srcdata = np.random.randint(1, 65536, (10, 10, 10), dtype=np.int32)
src = fslimage.Image(srcdata, xform=np.eye(4))
ref = fslimage.Image(src.data, xform=src2ref)
altref = fslimage.Image(src.data, xform=altv2w)
field = _affine_field(src, ref, ref2src, 'world', 'world')
src .save('src')
ref .save('ref')
altref.save('altref')
x5.writeNonLinearX5('xform.x5', field)
fsl_apply_x5.main('src xform.x5 out -r altref'.split())
result = fslimage.Image('out')
expect = np.zeros(srcdata.shape)
expect[:5, :5, :5] = srcdata[5:, 5:, 5:]
assert result.sameSpace(altref)
assert np.all(result.data == expect)
def test_linear_altsrc(seed):
with tempdir.tempdir():
src2ref = _random_affine()
src = _random_image(np.eye(4), (20, 20, 20))
ref = _random_image(src2ref)
x5.writeLinearX5('xform.x5', src2ref, src, ref)
src.save('src')
ref.save('ref')
srclo, xf = resample.resample(src, (10, 10, 10))
srclo = fslimage.Image(srclo, xform=xf)
srchi, xf = resample.resample(src, (40, 40, 40))
srchi = fslimage.Image(srchi, xform=xf)
srcoff = roi.roi(src, [(-10, 10), (-10, 10), (-10, 10)])
srclo .save('srclo')
srchi .save('srchi')
srcoff.save('srcoff')
fsl_apply_x5.main('src xform.x5 out' .split())
fsl_apply_x5.main('srclo xform.x5 outlo' .split())
fsl_apply_x5.main('srchi xform.x5 outhi' .split())
fsl_apply_x5.main('srcoff xform.x5 outoff'.split())
out = fslimage.Image('out')
outlo = fslimage.Image('outlo')
outhi = fslimage.Image('outhi')
outoff = fslimage.Image('outoff')
exp = resample.resampleToReference(src, ref, matrix=src2ref)[0]
explo = resample.resampleToReference(srclo, ref, matrix=src2ref)[0]
exphi = resample.resampleToReference(srchi, ref, matrix=src2ref)[0]
expoff = resample.resampleToReference(srcoff, ref, matrix=src2ref)[0]
assert out .sameSpace(ref)
assert outlo .sameSpace(ref)
assert outhi .sameSpace(ref)
assert outoff.sameSpace(ref)
assert np.all(np.isclose(out .data, exp))
assert np.all(np.isclose(outlo .data, explo))
assert np.all(np.isclose(outhi .data, exphi))
assert np.all(np.isclose(outoff.data, expoff))
def test_nonlinear_altsrc(seed):
with tempdir.tempdir():
src2ref = _random_affine()
ref2src = affine.invert(src2ref)
src = _random_image(np.eye(4), (20, 20, 20))
ref = _random_image(src2ref, (20, 20, 20))
field = _affine_field(src, ref, ref2src, 'world', 'world')
x5.writeNonLinearX5('xform.x5', field)
src.save('src')
ref.save('ref')
# use origin=corner so that the
# resampled variants are exactly
# aligned in the world coordinate
# system
srclo, xf = resample.resample(src, (10, 10, 10), origin='corner', smooth=False)
srclo = fslimage.Image(srclo, xform=xf)
srchi, xf = resample.resample(src, (40, 40, 40), origin='corner', smooth=False)
srchi = fslimage.Image(srchi, xform=xf)
srcoff = roi.roi(src, [(-10, 10), (-10, 10), (-10, 10)])
srclo .save('srclo')
srchi .save('srchi')
srcoff.save('srcoff')
fsl_apply_x5.main('src xform.x5 out' .split())
fsl_apply_x5.main('srclo xform.x5 outlo' .split())
fsl_apply_x5.main('srchi xform.x5 outhi' .split())
fsl_apply_x5.main('srcoff xform.x5 outoff'.split())
out = fslimage.Image('out')
outlo = fslimage.Image('outlo')
outhi = fslimage.Image('outhi')
outoff = fslimage.Image('outoff')
exp, x1 = resample.resampleToReference(src, ref, matrix=src2ref, mode='constant', smooth=False)
explo, x2 = resample.resampleToReference(srclo, ref, matrix=src2ref, mode='constant', smooth=False)
exphi, x3 = resample.resampleToReference(srchi, ref, matrix=src2ref, mode='constant', smooth=False)
expoff, x4 = resample.resampleToReference(srcoff, ref, matrix=src2ref, mode='constant', smooth=False)
assert out .sameSpace(ref)
assert outlo .sameSpace(ref)
assert outhi .sameSpace(ref)
assert outoff.sameSpace(ref)
# We get boundary cropping,
# so ignore edge slices
out = out .data[1:-1, 1:-1, 1:-1]
outlo = outlo .data[1:-1, 1:-1, 1:-1]
outhi = outhi .data[1:-1, 1:-1, 1:-1]
outoff = outoff.data[ :9, :9, :9]
exp = exp[ 1:-1, 1:-1, 1:-1]
explo = explo[ 1:-1, 1:-1, 1:-1]
exphi = exphi[ 1:-1, 1:-1, 1:-1]
expoff = expoff[ :9, :9, :9]
tol = dict(atol=1e-3, rtol=1e-3)
assert np.all(np.isclose(out, exp, **tol))
assert np.all(np.isclose(outlo, explo, **tol))
assert np.all(np.isclose(outhi, exphi, **tol))
assert np.all(np.isclose(outoff, expoff, **tol))