Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • paulmc/fslpy
  • ndcn0236/fslpy
  • seanf/fslpy
3 results
Show changes
Showing
with 2004 additions and 0 deletions
#!/usr/bin/env python
#
# test_wrappers.py -
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
import os.path as op
import pathlib
import shlex
import fsl.wrappers as fw
from fsl.tests.test_wrappers import checkResult, testenv
def test_bet():
with testenv('bet') as bet:
result = fw.bet('input', 'output', mask=True, c=(10, 20, 30))
expected = (bet + ' input output', ('-m', '-c 10 20 30'))
assert checkResult(result.stdout[0], *expected, stripdir=[2])
def test_robustfov():
with testenv('robustfov') as rfov:
result = fw.robustfov('input', 'output', b=180)
expected = f'{rfov} -i input -b 180 -r output'
assert result.stdout[0] == expected
def test_eddy():
with testenv('eddy') as eddy:
result = fw.eddy('imain', 'mask', 'index', 'acqp',
'bvecs', 'bvals', 'out', dont_mask_output=True)
expected = f'{eddy} ' \
'--imain=imain '\
'--mask=mask '\
'--index=index '\
'--acqp=acqp '\
'--bvecs=bvecs '\
'--bvals=bvals '\
'--out=out '\
'--dont_mask_output'
assert result.stdout[0] == expected
def test_topup():
with testenv('topup') as topup:
result = fw.topup('imain', 'datain', minmet=1)
expected = topup + ' --imain=imain --datain=datain --minmet=1'
assert result.stdout[0] == expected
def test_applytopup():
with testenv('applytopup') as applytopup:
result = fw.applytopup('imain', 'datain', '1,2,3', 'topup', 'out',
m='jac')
expected = f'{applytopup} --imain=imain --datain=datain ' \
'--inindex=1,2,3 --topup=topup --out=out -m jac'
assert result.stdout[0] == expected
result = fw.applytopup('imain', 'datain', [1, 2, 3], 'topup', 'out',
method='jac')
expected = f'{applytopup} --imain=imain --datain=datain ' \
'--inindex=1,2,3 --topup=topup --out=out --method=jac'
assert result.stdout[0] == expected
def test_flirt():
with testenv('flirt') as flirt:
result = fw.flirt('src', 'ref', usesqform=True, anglerep='euler')
expected = f'{flirt} -in src -ref ref -usesqform -anglerep euler'
assert result.stdout[0] == expected
def test_fixscaleskew():
with testenv('convert_xfm') as convert_xfm:
result = fw.fixscaleskew('mat1', 'mat2', 'out')
expected = f'{convert_xfm} -fixscaleskew mat2 mat1 -omat out'
assert result.stdout[0] == expected
def test_epi_reg():
with testenv('epi_reg') as epi_reg:
result = fw.epi_reg('epi', 't1', 't1brain', 'out')
expected = epi_reg + ' --epi=epi --t1=t1 --t1brain=t1brain --out=out'
assert result.stdout[0] == expected
def test_applyxfm():
with testenv('flirt') as flirt:
result = fw.applyxfm('src', 'ref', 'mat', 'out', interp='trilinear')
expected = f'{flirt} -in src -ref ref -out out -applyxfm -init mat -interp trilinear'
assert result.stdout[0] == expected
def test_applyxfm4D():
with testenv('applyxfm4D') as applyxfm:
result = fw.applyxfm4D(
'src', 'ref', 'out', 'mat', fourdigit=True, userprefix='boo')
expected = f'{applyxfm} src ref out mat -fourdigit -userprefix boo'
assert result.stdout[0] == expected
def test_invxfm():
with testenv('convert_xfm') as cnvxfm:
result = fw.invxfm('mat', 'output')
expected = cnvxfm + ' -omat output -inverse mat'
assert result.stdout[0] == expected
def test_concatxfm():
with testenv('convert_xfm') as cnvxfm:
result = fw.concatxfm('mat1', 'mat2', 'output')
expected = cnvxfm + ' -omat output -concat mat2 mat1'
assert result.stdout[0] == expected
def test_mcflirt():
with testenv('mcflirt') as mcflirt:
result = fw.mcflirt('input', out='output', cost='normcorr', dof=12)
expected = f'{mcflirt} -in input -out output -cost normcorr -dof 12'
assert result.stdout[0] == expected
def test_fnirt():
with testenv('fnirt') as fnirt:
result = fw.fnirt('src', ref='ref', iout='iout', fout='fout',
subsamp=(8, 6, 4, 2))
expected = f'{fnirt} --in=src --ref=ref --iout=iout --fout=fout --subsamp=8,6,4,2'
assert result.stdout[0] == expected
def test_applywarp():
with testenv('applywarp') as applywarp:
result = fw.applywarp('src', 'ref', 'out', warp='warp', abs=True, super=True)
expected = f'{applywarp} --in=src --ref=ref --out=out --warp=warp --abs --super'
assert result.stdout[0] == expected
def test_invwarp():
with testenv('invwarp') as invwarp:
result = fw.invwarp('warp', 'ref', 'out', rel=True, noconstraint=True)
expected = f'{invwarp} --warp=warp --ref=ref --out=out --rel --noconstraint'
assert result.stdout[0] == expected
def test_convertwarp():
with testenv('convertwarp') as cnvwarp:
result = fw.convertwarp('out', 'ref', absout=True, jacobian='jacobian')
expected = f'{cnvwarp} --ref=ref --out=out --absout --jacobian=jacobian'
assert result.stdout[0] == expected
def test_fugue():
with testenv('fugue') as fugue:
result = fw.fugue(input='input', warp='warp', median=True, dwell=10)
expected = f'{fugue} --in=input --warp=warp --median --dwell=10'
assert result.stdout[0] == expected
def test_sigloss():
with testenv('sigloss') as sigloss:
result = fw.sigloss('input', 'sigloss', mask='mask', te=0.5)
expected = f'{sigloss} --in input --sigloss sigloss --mask mask --te 0.5'
assert result.stdout[0] == expected
def test_prelude():
with testenv('prelude') as prelude:
result = fw.prelude(complex='complex',
out='out',
labelslices=True,
start=5)
expected = f'{prelude} --complex=complex --out=out --labelslices --start=5'
assert result.stdout[0] == expected
def test_melodic():
with testenv('melodic') as melodic:
result = fw.melodic('input', dim=50, mask='mask', Oall=True)
expected = f'{melodic} --in=input --dim=50 --mask=mask --Oall'
assert result.stdout[0] == expected
def test_fsl_regfilt():
with testenv('fsl_regfilt') as regfilt:
result = fw.fsl_regfilt('input', 'output', 'design',
filter=(1, 2, 3, 4), vn=True, a=True)
expected = f'{regfilt} --in=input --out=output --design=design ' \
'--filter=1,2,3,4 --vn -a'
assert result.stdout[0] == expected
def test_fsl_glm():
with testenv('fsl_glm') as fsl_glm:
exp = f'{fsl_glm} --in=in --out=out --design=des -m mask --demean --dof=7'
res = fw.fsl_glm('in', 'out', 'des', m='mask', demean=True, dof=7)
assert res.stdout[0] == exp
def test_fslorient():
with testenv('fslorient') as fslo:
result = fw.fslorient('input', setsform=(-2, 0, 0, 90, 0, 2, 0, -126, 0, 0, 2, -72, 0, 0, 0, 1))
expected = fslo + ' -setsform -2 0 0 90 0 2 0 -126 0 0 2 -72 0 0 0 1' + ' input'
assert result.stdout[0] == expected
result = fw.fslorient('input', getorient=True)
expected = fslo + ' -getorient' + ' input'
assert result.stdout[0] == expected
result = fw.fslorient('input', setsformcode=1)
expected = fslo + ' -setsformcode 1' + ' input'
assert result.stdout[0] == expected
def test_fslreorient2std():
with testenv('fslreorient2std') as r2std:
result = fw.fslreorient2std('input', 'output')
expected = r2std + ' input output'
assert result.stdout[0] == expected
def test_fslroi():
with testenv('fslroi') as fslroi:
result = fw.fslroi('input', 'output', 1, 10)
expected = fslroi + ' input output 1 10'
assert result.stdout[0] == expected
result = fw.fslroi('input', 'output', 1, 10, 2, 20, 3, 30)
expected = fslroi + ' input output 1 10 2 20 3 30'
assert result.stdout[0] == expected
result = fw.fslroi('input', 'output', 1, 10, 2, 20, 3, 30, 4, 40)
expected = fslroi + ' input output 1 10 2 20 3 30 4 40'
assert result.stdout[0] == expected
def test_slicer():
with testenv('slicer') as slicer:
result = fw.slicer('input1', 'input2', i=(20, 100), x=(20, 'x.png'))
expected = slicer + ' input1 input2 -i 20 100 -x 20 x.png'
assert result.stdout[0] == expected
def test_fast():
with testenv('fast') as fast:
result = fw.fast('input', 'myseg', n_classes=3)
expected = f'{fast} --out=myseg --class=3 input'
assert result.stdout[0] == expected
result = fw.fast(('in1', 'in2', 'in3'), 'myseg', n_classes=3)
expected = f'{fast} --out=myseg --class=3 in1 in2 in3'
assert result.stdout[0] == expected
result = fw.fast(('in1', 'in2', 'in3'), 'myseg', n_classes=3, verbose=True)
expected = f'{fast} --out=myseg --class=3 --verbose in1 in2 in3'
assert result.stdout[0] == expected
result = fw.fast(('in1', 'in2', 'in3'), 'myseg', n_classes=3,
a='reg.mat', A=('csf', 'gm', 'wm'), Prior=True)
expected = f'{fast} --out=myseg --class=3 -a reg.mat '\
'-A csf gm wm --Prior in1 in2 in3'
assert result.stdout[0] == expected
def test_fsl_anat():
with testenv('fsl_anat') as fsl_anat:
result = fw.fsl_anat('t1', out='fsl_anat', bias_smoothing=25)
expected = f'{fsl_anat} -i t1 -o fsl_anat -t T1 -s 25'
assert result.stdout[0] == expected
def test_gps():
with testenv('gps') as gps:
result = fw.gps('bvecs', 128, optws=True, ranseed=123)
expected = f'{gps} --ndir=128 --out=bvecs --optws --ranseed=123'
assert result.stdout[0] == expected
def test_tbss():
exes = ['tbss_1_preproc',
'tbss_2_reg',
'tbss_3_postreg',
'tbss_4_prestats',
'tbss_non_FA',
'tbss_fill']
with testenv(*exes) as (preproc, reg, postreg, prestats, non_FA, fill):
assert fw.tbss.preproc('1', '2')[0] == f'{preproc} 1 2'
assert fw.tbss.reg(T=True)[0] == f'{reg} -T'
assert fw.tbss.reg(n=True)[0] == f'{reg} -n'
assert fw.tbss.reg(t='target')[0] == f'{reg} -t target'
assert fw.tbss.postreg(S=True)[0] == f'{postreg} -S'
assert fw.tbss.postreg(T=True)[0] == f'{postreg} -T'
assert fw.tbss.prestats(0.3)[0] == f'{prestats} 0.3'
assert fw.tbss.non_FA('alt')[0] == f'{non_FA} alt'
assert fw.tbss.fill('stat', 0.4, 'mean_fa', 'output', n=True).stdout[0] == \
f'{fill} stat 0.4 mean_fa output -n'
def test_fsl_prepare_fieldmap():
with testenv('fsl_prepare_fieldmap') as fpf:
result = fw.fsl_prepare_fieldmap(phase_image='ph',
magnitude_image='mag',
out_image='out',
deltaTE=2.46,
nocheck=True)
expected = f'{fpf} SIEMENS ph mag out 2.46 --nocheck'
assert result.stdout[0] == expected
def test_fsl_sub():
with testenv('fsl_sub') as fsl_sub:
expected = [fsl_sub,
'--jobhold', '123',
'--queue', 'long.q',
'some_command', '--some_arg']
result = fw.fsl_sub(
'some_command', '--some_arg', jobhold='123', queue='long.q')
assert shlex.split(result[0]) == expected
def test_standard_space_roi():
with testenv('standard_space_roi') as ssr:
expected = [ssr,
'input',
'output',
'-maskFOV',
'-maskNONE',
'-maskMASK', 'mask',
'-d',
'-b',
'-ssref', 'ssref',
'-altinput', 'altinput',
'-2D',
'-usesqform',
'-ref flirt_ref']
result = fw.standard_space_roi(
'input', 'output',
maskFOV=True,
maskNONE=True,
maskMASK='mask',
d=True,
b=True,
ssref='ssref',
altinput='altinput',
twod=True,
usesqform=True,
ref='flirt_ref')
assert result.stdout[0] == ' '.join(expected)
def test_fslswapdim():
with testenv('fslswapdim') as swapdim:
assert fw.fslswapdim('input', 'a', 'b', 'c').stdout[0] == \
f'{swapdim} input a b c'
assert fw.fslswapdim('input', 'a', 'b', 'c', 'output').stdout[0] == \
f'{swapdim} input a b c output'
def test_first():
exes = ['first', 'first_flirt', 'run_first',
'run_first_all', 'first_utils', 'concat_bvars']
with testenv(*exes) as (first, first_flirt, run_first,
run_first_all, first_utils, concat_bvars):
expected = f'{first} --in=input --outputName=output ' \
'--inputModel=inmodel --flirtMatrix=flirtmat '\
'--shcond -n 5 --bmapname=bmaps'
result = fw.first('input', 'output', 'inmodel', 'flirtmat',
shcond=True, n=5, bmapname='bmaps')
assert result.stdout[0] == expected
expected = f'{first_flirt} input outbase -b -cost costfn'
result = fw.first_flirt('input', 'outbase', b=True, cost='costfn')
assert result.stdout[0] == expected
expected = f'{run_first} -i input -t mat -n 20 ' \
'-o output -m L_Thal -multipleImages -intref R_Thal'
result = fw.run_first('input', 'mat', 20, 'output', 'L_Thal',
multipleImages=True, intref='R_Thal')
assert result.stdout[0] == expected
expected = f'{run_first} -i input -t mat -n 20 ' \
'-o output -m L_Thal -multipleImages -intref R_Thal'
result = fw.run_first('input', 'mat', 20, 'output', 'L_Thal',
multipleImages=True, intref='R_Thal')
assert result.stdout[0] == expected
expected = f'{run_first_all} -i input -o outbase -3 -s L_Hipp,R_Hipp '\
'-d'
result = fw.run_first_all('input', 'outbase', three=True,
s='L_Hipp,R_Hipp', d=True)
assert result.stdout[0] == expected
expected = f'{first_utils} --in input --out out --useScale '\
'--numModes=20'
result = fw.first_utils('input', 'out', useScale=True,
numModes=20)
assert result.stdout[0] == expected
expected = f'{concat_bvars} output in1 in2 in3'
result = fw.concat_bvars('output', 'in1', 'in2', 'in3')
assert result[0] == expected
def test_fslmerge():
with testenv('fslmerge') as fslmerge:
expected = f'{fslmerge} -x out in1 in2 in3'
result = fw.fslmerge('x', 'out', 'in1', 'in2', 'in3')
assert result.stdout[0] == expected
expected = f'{fslmerge} -n 123 out in1 in2 in3'
result = fw.fslmerge('n', 'out', 123, 'in1', 'in2', 'in3')
assert result.stdout[0] == expected
expected = f'{fslmerge} -tr out in1 in2 in3 123'
result = fw.fslmerge('tr', 'out', 'in1', 'in2', 'in3', 123)
assert result.stdout[0] == expected
def test_fslselectvols():
with testenv('fslselectvols') as fsv:
# vols can either be a sequence,
# comma-separated string, or
# string/path to a file
expected = f'{fsv} -i in -o out --vols=vols.txt'
result = fw.fslselectvols('in', 'out', 'vols.txt')
assert result.stdout[0] == expected
absvols = op.abspath('vols.txt')
expected = f'{fsv} -i in -o out --vols={absvols}'
result = fw.fslselectvols('in', 'out', pathlib.Path('vols.txt'))
assert result.stdout[0] == expected
expected = f'{fsv} -i in -o out --vols=1,2,3'
result = fw.fslselectvols('in', 'out', '1,2,3')
assert result.stdout[0] == expected
expected = f'{fsv} -i in -o out --vols=1,2,3'
result = fw.fslselectvols('in', 'out', ['1', '2', '3'])
assert result.stdout[0] == expected
expected = f'{fsv} -i in -o out --vols=1,2,3'
result = fw.fslselectvols('in', 'out', [1, 2, 3])
assert result.stdout[0] == expected
def test_fslsplit():
with testenv('fslsplit') as fslsplit:
assert fw.fslsplit('src') .stdout[0] == f'{fslsplit} src'
assert fw.fslsplit('src', 'out') .stdout[0] == f'{fslsplit} src out'
assert fw.fslsplit('src', dim='x').stdout[0] == f'{fslsplit} src -x'
assert fw.fslsplit('src', 'out', dim='t').stdout[0] == f'{fslsplit} src out -t'
def test_fslcpgeom():
with testenv('fslcpgeom') as fslcpgeom:
assert fw.fslcpgeom('src', 'dest') .stdout[0] == f'{fslcpgeom} src dest'
assert fw.fslcpgeom('src', 'dest', d=True).stdout[0] == f'{fslcpgeom} src dest -d'
def test_bianca():
exes = ['bianca',
'bianca_cluster_stats',
'bianca_overlap_measures',
'bianca_perivent_deep',
'make_bianca_mask']
with testenv(*exes) as (bianca,
bianca_cluster_stats,
bianca_overlap_measures,
bianca_perivent_deep,
make_bianca_mask):
expected = f'{bianca} --singlefile sfile --querysubjectnum 3 --brainmaskfeaturenum 4'
result = fw.bianca('sfile', 3, 4)
assert result[0] == expected
expected = f'{bianca} --singlefile sfile --querysubjectnum 3 --patch3d -v'
result = fw.bianca('sfile', 3, patch3d=True, v=True)
assert result[0] == expected
expected = f'{bianca_cluster_stats} out 9 5 mask'
result = fw.bianca_cluster_stats('out', 9, 5, 'mask')
assert result.stdout[0] == expected
expected = f'{bianca_overlap_measures} lesions 9 mask 1'
result = fw.bianca_overlap_measures('lesions', 9, 'mask', True)
assert result.stdout[0] == expected
expected = f'{bianca_perivent_deep} wmh vent 10 2 out'
result = fw.bianca_perivent_deep('wmh', 'vent', 10, 'out', 2)
assert result.stdout[0] == expected
expected = f'{make_bianca_mask} struc csf warp 1'
result = fw.make_bianca_mask('struc', 'csf', 'warp', True)
assert result.stdout[0] == expected
def test_feat():
with testenv('feat') as feat:
assert fw.feat('design.fsf')[0] == f'{feat} design.fsf'
def test_featquery():
with testenv('featquery') as featquery:
expect = f'{featquery} 3 feat1 feat2 feat3 2 stat1 stat2 ' \
'output -p -t 0.4 -w mask -vox 20 30 40'
result = fw.featquery(('feat1', 'feat2', 'feat3'),
('stat1', 'stat2'),
'output', 'mask',
vox=(20, 30, 40), t=0.4,
p=True, w=True)
assert result.stdout[0] == expect
expect = f'{featquery} 2 feat1 feat2 3 stat1 stat2 stat3 ' \
'output -a atlas -i 1.5 -s mask -mm 20 30 40'
result = fw.featquery(('feat1', 'feat2'),
('stat1', 'stat2', 'stat3'),
'output', 'mask',
w=False,
s=True,
a='atlas',
mm=(20, 30, 40),
i=1.5)
assert result.stdout[0] == expect
def test_dtifit():
with testenv('dtifit') as dtifit:
res = fw.dtifit('data', 'out', 'mask', 'bvecs', 'bvals', kurt=True, z=2, xmax=6)
exp = f'{dtifit} --data=data --out=out --mask=mask --bvecs=bvecs '\
'--bvals=bvals --kurt -z 2 --xmax=6'
assert res.stdout[0] == exp
def test_vecreg():
with testenv('vecreg') as vecreg:
res = fw.vecreg('in', 'out', 'ref', warpfield='warp',
premat='premat.mat', interp='sinc', m='mask')
exp = f'{vecreg} -i in -o out -r ref --warpfield=warp ' \
'--premat=premat.mat --interp=sinc -m mask'
assert res.stdout[0] == exp
def test_xfibres():
with testenv('xfibres') as xfibres:
res = fw.xfibres('data', 'mask', 'bvecs', 'bvals',
f0=True, nf=20, V=True)
exp = f'{xfibres} --data=data --mask=mask --bvecs=bvecs ' \
'--bvals=bvals --f0 --nf=20 -V'
assert res.stdout[0] == exp
def test_xfibres_gpu():
with testenv('xfibres_gpu') as xfibres_gpu:
res = fw.xfibres_gpu('data', 'mask', 'bvecs', 'bvals', 'subjdir',
1, 10, 100, f0=True, nf=20)
exp = f'{xfibres_gpu} --data=data --mask=mask --bvecs=bvecs ' \
'--bvals=bvals --f0 --nf=20 subjdir 1 10 100'
assert res.stdout[0] == exp
def test_split_parts_gpu():
with testenv('split_parts_gpu') as split_parts_gpu:
res = fw.split_parts_gpu('data', 'mask', 'bvals', 'bvecs', 10, 'out')
exp = f'{split_parts_gpu} data mask bvals bvecs None 0 10 out'
assert res.stdout[0] == exp
res = fw.split_parts_gpu('data', 'mask', 'bvals', 'bvecs', 10, 'out', 'grad')
exp = f'{split_parts_gpu} data mask bvals bvecs grad 1 10 out'
assert res.stdout[0] == exp
def test_bedpostx():
with testenv('bedpostx') as bpx:
res = fw.bedpostx('data', noard=True, nf=2)
exp = f'{bpx} data --noard --nf=2'
assert res[0] == exp
def test_bedpostx_gpu():
with testenv('bedpostx_gpu') as bpx:
res = fw.bedpostx_gpu('data', noard=True, nf=2)
exp = f'{bpx} data --noard --nf=2'
assert res[0] == exp
def test_bedpostx_postproc_gpu():
with testenv('bedpostx_postproc_gpu.sh') as bpg:
res = fw.bedpostx_postproc_gpu('data', 'mask', 'bvecs', 'bvals',
100, 10, 'subdir', 'bindir', nf=20)
exp = f'{bpg} --data=data --mask=mask --bvecs=bvecs --bvals=bvals ' \
'--nf=20 100 10 subdir bindir'
assert res.stdout[0] == exp
def test_probtrackx():
with testenv('probtrackx') as ptx:
res = fw.probtrackx('samples', 'mask', 'seed', rseed=20,
usef=True, S=50, nsamples=50)
exp = f'{ptx} --samples=samples --mask=mask --seed=seed --rseed=20 ' \
'--usef -S 50 --nsamples=50'
assert res.stdout[0] == exp
def test_probtrackx2():
with testenv('probtrackx2') as ptx2:
res = fw.probtrackx2('samples', 'mask', 'seed', rseed=20,
usef=True, S=50, nsamples=50)
exp = f'{ptx2} --samples=samples --mask=mask --seed=seed --rseed=20 ' \
'--usef -S 50 --nsamples=50'
assert res.stdout[0] == exp
def test_probtrackx2_gpu():
with testenv('probtrackx2_gpu') as ptx2gpu:
res = fw.probtrackx2_gpu('samples', 'mask', 'seed', rseed=20,
usef=True, S=50, nsamples=50)
exp = f'{ptx2gpu} --samples=samples --mask=mask --seed=seed ' \
'--rseed=20 --usef -S 50 --nsamples=50'
assert res.stdout[0] == exp
def test_oxford_asl():
with testenv('oxford_asl') as oxford_asl:
res = fw.oxford_asl('in', 'out',
S='T1',
sbrain='T1_brain',
regfrom_method='pwi',
region_analysis=True,
region_analysis_atlas=['a1', 'a2', 'a3'],
region_analysis_atlas_labels=['l1', 'l2', 'l3'])
exp = f'{oxford_asl} -i in -o out -S T1 --sbrain=T1_brain ' \
'--regfrom-method=pwi --region-analysis ' \
'--region-analysis-atlas=a1 ' \
'--region-analysis-atlas=a2 ' \
'--region-analysis-atlas=a3 ' \
'--region-analysis-atlas-labels=l1 ' \
'--region-analysis-atlas-labels=l2 ' \
'--region-analysis-atlas-labels=l3'
assert res.stdout[0] == exp
def test_asl_file():
with testenv('asl_file') as asl_file:
res = fw.asl_file('in', 20, 'out', diff=True, iaf='ct')
exp = f'{asl_file} --data=in --ntis=20 --out=out --diff --iaf=ct'
assert res.stdout[0] == exp
def test_randomise():
with testenv('randomise') as randomise:
res = fw.randomise('input', 'outbase', T=True, T2=True, fonly=True,
d='design.mat', one=True)
exp = f'{randomise} -i input -o outbase -T --T2 --fonly -d design.mat -1'
assert res[0] == exp
def test_get_standard():
with testenv('fsl_get_standard') as exe:
assert fw.get_standard('brain', 'T1') == f'{exe} brain T1'
assert fw.get_standard('whole_head', r=2) == f'{exe} whole_head -r 2'
def test_makerot():
with testenv('makerot') as exe:
assert fw.makerot(90).stdout[0] == f'{exe} -t 90'
assert fw.makerot(90, cov='cov', centre=[10, 10, 10]).stdout[0] == \
f'{exe} -t 90 --cov=cov --centre=10,10,10'
def test_midtrans():
with testenv('midtrans') as exe:
assert fw.midtrans('out', 'xfm1', 'xfm2', 'xfm3').stdout[0] == \
f'{exe} -o out xfm1 xfm2 xfm3'
#!/usr/bin/env python
#
# Test that arguments intended for fsl.utils.run.run are passed
# through correctly by wrapper functions
import os
import os.path as op
from io import StringIO
from unittest import mock
import pytest
import fsl.utils.tempdir as tempdir
import fsl.wrappers as wrappers
import fsl.wrappers.wrapperutils as wutils
from fsl.tests import CaptureStdout
mock_command = """
#!/usr/bin/env bash
echo "Standard output"
>&2 echo "Standard error"
exit {exitcode}
""".strip()
def create_mock_command(exitcode):
with open('mock_command', 'wt') as f:
f.write(mock_command.format(exitcode=exitcode))
os.chmod('mock_command', 0o777)
@wutils.cmdwrapper
def command():
return ['mock_command']
command_fot = wutils.fileOrImage()(command)
def run_command(fileorthing, expect=None, *args, **kwargs):
if fileorthing: result = command_fot(*args, **kwargs)
else: result = command( *args, **kwargs)
if expect is None:
return
if fileorthing:
if 'submit' in kwargs or 'cmdonly' in kwargs:
assert result == expect
else:
assert result.stdout == expect
else:
assert result == expect
@pytest.mark.parametrize('fileorthing', [True, False])
def test_stdout_stderr_exitcode(fileorthing):
newpath = op.pathsep.join(('.', os.environ['PATH']))
with tempdir.tempdir(), \
mock.patch.dict(os.environ, {'PATH' : newpath}):
create_mock_command(0)
# default: stdout=true, stderr=true, exitcode=False
run_command(fileorthing, ('Standard output\n', 'Standard error\n'))
run_command(
fileorthing,
('Standard output\n', 'Standard error\n', 0),
exitcode=True)
run_command(fileorthing, 'Standard error\n', stdout=False)
run_command(fileorthing, 'Standard output\n', stderr=False)
# exitcode=False will cause an error
# to be raised on non-0 exit codes
create_mock_command(99)
with pytest.raises(RuntimeError):
run_command(fileorthing)
run_command(fileorthing,
('Standard output\n', 'Standard error\n', 99),
exitcode=True)
@pytest.mark.parametrize('fileorthing', [True, False])
def test_log(fileorthing):
newpath = op.pathsep.join(('.', os.environ['PATH']))
with tempdir.tempdir(), \
mock.patch.dict(os.environ, {'PATH' : newpath}):
# default: stdout/err is teed to current process
create_mock_command(0)
with CaptureStdout() as cap:
run_command(fileorthing, None)
assert cap.stdout == 'Standard output\n'
assert cap.stderr == 'Standard error\n'
# disable tee
with CaptureStdout() as cap:
result = run_command(
fileorthing,
('Standard output\n', 'Standard error\n'),
log={'tee' : False})
assert cap.stdout == ''
assert cap.stderr == ''
# forward stdout/err somewhere
# else, and record command
stdout = StringIO()
stderr = StringIO()
cmd = StringIO()
with CaptureStdout() as cap:
result = run_command(
fileorthing,
('Standard output\n', 'Standard error\n'),
log={'stdout' : stdout,
'stderr' : stderr,
'cmd' : cmd})
assert cap.stdout == 'Standard output\n'
assert cap.stderr == 'Standard error\n'
stdout.seek(0)
stderr.seek(0)
cmd .seek(0)
assert stdout.read() == 'Standard output\n'
assert stderr.read() == 'Standard error\n'
assert cmd .read() == 'mock_command\n'
@pytest.mark.parametrize('fileorthing', [True, False])
def test_cmdonly(fileorthing):
newpath = op.pathsep.join(('.', os.environ['PATH']))
with tempdir.tempdir(), \
mock.patch.dict(os.environ, {'PATH' : newpath}):
create_mock_command(0)
run_command(fileorthing, ['mock_command'], cmdonly=True)
@pytest.mark.parametrize('fileorthing', [True, False])
def test_wrapperconfig(fileorthing):
newpath = op.pathsep.join(('.', os.environ['PATH']))
with tempdir.tempdir(), \
mock.patch.dict(os.environ, {'PATH' : newpath}):
create_mock_command(0)
# default: stdout=true, stderr=true, exitcode=False
run_command(fileorthing, ('Standard output\n', 'Standard error\n'))
with wrappers.wrapperconfig(stdout=False):
run_command(fileorthing, 'Standard error\n')
run_command(fileorthing, ('Standard error\n', 0), exitcode=True)
with wrappers.wrapperconfig(stderr=False):
run_command(fileorthing, 'Standard output\n')
run_command(fileorthing, ('Standard output\n', 0), exitcode=True)
# check that default behaviour is restored
run_command(fileorthing, ('Standard output\n', 'Standard error\n'))
......@@ -8,12 +8,12 @@
import os.path as op
import os
import shlex
import pathlib
import textwrap
import functools as ft
try: from unittest import mock
except ImportError: import mock
from unittest import mock
import six
import pytest
import numpy as np
......@@ -21,20 +21,34 @@ import nibabel as nib
import fsl.utils.tempdir as tempdir
import fsl.utils.run as run
import fsl.utils.assertions as asrt
import fsl.utils.fslsub as fslsub
import fsl.data.image as fslimage
import fsl.wrappers.wrapperutils as wutils
from . import mockFSLDIR, cleardir, checkdir, testdir
from .test_run import mock_submit
from fsl.tests import mockFSLDIR, cleardir, checkdir, testdir, touch
from fsl.tests.test_run import mock_fsl_sub
def test_applyArgStyle_default():
kwargs = {
'arg1' : 'val',
'arg2' : ['val1', 'val2'],
'a' : 'val',
'b' : ['val1', 'val2'],
}
exp = ['--arg1=val', '--arg2=val1,val2', '-a', 'val', '-b', 'val1', 'val2']
assert wutils.applyArgStyle(**kwargs) == exp
def test_applyArgStyle():
kwargs = {
'name' : 'val',
'name2' : ['val1', 'val2'],
'name3' : 'val1 val2',
}
# these combinations of style+valsep should
......@@ -51,28 +65,70 @@ def test_applyArgStyle():
wutils.applyArgStyle('-', valsep='b', **kwargs)
# style, valsep, expected_result.
# Order of arguments is not guaranteed
tests = [
('-', ' ', [' -name val', '-name2 val1 val2']),
('-', '"', [' -name val', '-name2 "val1 val2"']),
('-', ',', [' -name val', '-name2 val1,val2']),
('-', ' ', ['-name', 'val', '-name2', 'val1', 'val2', '-name3', 'val1 val2']),
('-', '"', ['-name', 'val', '-name2', 'val1 val2', '-name3', 'val1 val2']),
('-', ',', ['-name', 'val', '-name2', 'val1,val2', '-name3', 'val1 val2']),
('--', ' ', ['--name val', '--name2 val1 val2']),
('--', '"', ['--name val', '--name2 "val1 val2"']),
('--', ',', ['--name val', '--name2 val1,val2']),
('--', ' ', ['--name', 'val', '--name2', 'val1', 'val2', '--name3', 'val1 val2']),
('--', '"', ['--name', 'val', '--name2', 'val1 val2', '--name3', 'val1 val2']),
('--', ',', ['--name', 'val', '--name2', 'val1,val2', '--name3', 'val1 val2']),
('-=', '"', [' -name=val', '-name2="val1 val2"']),
('-=', ',', [' -name=val', '-name2=val1,val2']),
('-=', '"', ['-name=val', '-name2=val1 val2', '-name3=val1 val2']),
('-=', ',', ['-name=val', '-name2=val1,val2', '-name3=val1 val2']),
('--=', '"', ['--name=val', '--name2="val1 val2"']),
('--=', ',', ['--name=val', '--name2=val1,val2']),
('--=', '"', ['--name=val', '--name2=val1 val2', '--name3=val1 val2']),
('--=', ',', ['--name=val', '--name2=val1,val2', '--name3=val1 val2']),
]
for style, valsep, exp in tests:
exp = [shlex.split(e) for e in exp]
result = wutils.applyArgStyle(style, valsep=valsep, **kwargs)
assert result == exp
def test_applyArgStyle_charstyle():
kwargs = {
'n' : 'val',
'm' : ['val1', 'val2'],
'o' : 'val1 val2',
}
# these combinations of charstyle+
# charsep should raise an error
with pytest.raises(ValueError):
wutils.applyArgStyle(charstyle='--=', charsep=' ', **kwargs)
with pytest.raises(ValueError):
wutils.applyArgStyle(charstyle='-=', charsep=' ', **kwargs)
# unsupported chrastyle/charsep
with pytest.raises(ValueError):
wutils.applyArgStyle(charstyle='?', **kwargs)
with pytest.raises(ValueError):
wutils.applyArgStyle('-', charsep='b', **kwargs)
# style, valsep, charstyle, charsep, expected_result.
# Order of arguments is not guaranteed
tests = [
('-', ' ', ['-n', 'val', '-m', 'val1', 'val2', '-o', 'val1 val2']),
('-', '"', ['-n', 'val', '-m', 'val1 val2', '-o', 'val1 val2']),
('-', ',', ['-n', 'val', '-m', 'val1,val2', '-o', 'val1 val2']),
('--', ' ', ['--n', 'val', '--m', 'val1','val2', '--o', 'val1 val2']),
('--', '"', ['--n', 'val', '--m', 'val1 val2', '--o', 'val1 val2']),
('--', ',', ['--n', 'val', '--m', 'val1,val2', '--o', 'val1 val2']),
('-=', '"', ['-n=val', '-m=val1 val2', '-o=val1 val2']),
('-=', ',', ['-n=val', '-m=val1,val2', '-o=val1 val2']),
assert result in (exp[0] + exp[1], exp[1] + exp[0])
('--=', '"', ['--n=val', '--m=val1 val2', '--o=val1 val2']),
('--=', ',', ['--n=val', '--m=val1,val2', '--o=val1 val2']),
]
for style, valsep, exp in tests:
result = wutils.applyArgStyle(charstyle=style, charsep=valsep, **kwargs)
assert result == exp
def test_applyArgStyle_argmap():
......@@ -94,7 +150,7 @@ def test_applyArgStyle_argmap():
assert wutils.applyArgStyle('-', argmap=argmap, **kwargs) in exp
def test_applyArgStyle_valmap():
def test_applyArgStyle_valmap_show_hide():
valmap = {
'a' : wutils.SHOW_IF_TRUE,
......@@ -121,6 +177,27 @@ def test_applyArgStyle_valmap():
assert wutils.applyArgStyle('-', valmap=valmap, **kwargs) in expected
def test_applyArgStyle_valmap_unroll_list():
valmap = {
'a' : wutils.EXPAND_LIST,
'aa' : wutils.EXPAND_LIST,
}
tests = [
({ 'a' : [1, 2, 3], 'b' : [1, 2, 3]},
'-a 1 -a 2 -a 3 -b 1 2 3'),
({ 'aa' : [1, 2, 3], 'bb' : [1, 2, 3]},
'--aa=1 --aa=2 --aa=3 --bb=1,2,3'),
]
for kwargs, expected in tests:
expected = shlex.split(expected)
assert wutils.applyArgStyle(valmap=valmap, **kwargs) == expected
def test_applyArgStyle_argmap_valmap():
argmap = {'a1' : 'a', 'a2' : 'b'}
......@@ -131,23 +208,39 @@ def test_applyArgStyle_argmap_valmap():
# kwargs, expected
tests = [
({ }, ['']),
({ 'a1' : False, }, ['']),
({ 'a1' : True, }, ['-a']),
({ 'a2' : False }, ['-b']),
({ 'a2' : True }, ['']),
({ 'a1' : False, 'a2' : True }, ['']),
({ 'a1' : True, 'a2' : True }, ['-a']),
({ 'a1' : False, 'a2' : False }, ['-b']),
({ 'a1' : False, 'a2' : True }, ['']),
({ 'a1' : True, 'a2' : False }, ['-a -b', '-b -a']),
({ 'a1' : True, 'a2' : True }, ['-a']),
({ }, ''),
({ 'a1' : False, }, ''),
({ 'a1' : True, }, '-a'),
({ 'a2' : False }, '-b'),
({ 'a2' : True }, ''),
({ 'a1' : False, 'a2' : True }, ''),
({ 'a1' : True, 'a2' : True }, '-a'),
({ 'a1' : False, 'a2' : False }, '-b'),
({ 'a1' : False, 'a2' : True }, ''),
({ 'a1' : True, 'a2' : False }, '-a -b'),
({ 'a1' : True, 'a2' : True }, '-a'),
]
for kwargs, expected in tests:
expected = [shlex.split(e) for e in expected]
expected = shlex.split(expected)
assert wutils.applyArgStyle(
'-', argmap=argmap, valmap=valmap, **kwargs) in expected
'-', argmap=argmap, valmap=valmap, **kwargs) == expected
def test_applyArgStyle_callable_argmap():
def repl(s):
return s.replace('_', '-')
tests = [
({'some_arg' : 'a_b_c', 'c' : 'd_e'},
['--some-arg=a_b_c', '-c', 'd_e']),
({'some_arg' : 'a', 'somearg' : 'b', 'c' : 'd'},
['--some-arg=a', '--somearg=b', '-c', 'd']),
]
for kwargs, expect in tests:
assert wutils.applyArgStyle(argmap=repl, **kwargs) == expect
def test_namedPositionals():
......@@ -159,14 +252,15 @@ def test_namedPositionals():
def func6(a, b, *args): pass
def func7(a, b, *args, **kwargs): pass
suf = wutils.namedPositionals.varargsSuffix
tests = [
(func1, [], []),
(func2, [1, 2, 3], ['a', 'b', 'c']),
(func3, [1, 2, 3], ['a', 'b', 'c']),
(func4, [1, 2, 3], ['args0', 'args1', 'args2']),
(func5, [1, 2, 3], ['args0', 'args1', 'args2']),
(func6, [1, 2, 3], ['a', 'b', 'args0']),
(func7, [1, 2, 3], ['a', 'b', 'args0']),
(func4, [1, 2, 3], [f'args0{suf}', f'args1{suf}', f'args2{suf}']),
(func5, [1, 2, 3], [f'args0{suf}', f'args1{suf}', f'args2{suf}']),
(func6, [1, 2, 3], ['a', 'b', f'args0{suf}']),
(func7, [1, 2, 3], ['a', 'b', f'args0{suf}']),
]
for func, args, expected in tests:
......@@ -231,59 +325,62 @@ def test_fileOrImage():
@wutils.fileOrImage('img1', 'img2', 'output')
def func(img1, **kwargs):
img1 = nib.load(img1).get_data()
img2 = nib.load(kwargs['img2']).get_data()
img1 = np.asanyarray(nib.load(img1).dataobj)
img2 = np.asanyarray(nib.load(kwargs['img2']).dataobj)
output = nib.nifti1.Nifti1Image(img1 * img2, np.eye(4))
nib.save(output, kwargs['output'])
with tempdir.tempdir():
img1 = nib.nifti1.Nifti1Image(np.array([[1, 2], [ 3, 4]]), np.eye(4))
img2 = nib.nifti1.Nifti1Image(np.array([[5, 6], [ 7, 8]]), np.eye(4))
img3 = nib.nifti1.Nifti1Image(np.array([[1, 2], [ 3, 4]]), np.eye(4))
expected = np.array([[5, 12], [21, 32]])
img1 = nib.nifti1.Nifti1Image(
np.array([[1, 2], [ 3, 4]], dtype=np.int32), np.eye(4))
img2 = nib.nifti1.Nifti1Image(
np.array([[5, 6], [ 7, 8]], dtype=np.int32), np.eye(4))
img3 = nib.nifti1.Nifti1Image(
np.array([[1, 2], [ 3, 4]], dtype=np.int32), np.eye(4))
expected = np.array([[5, 12], [21, 32]], dtype=np.int32)
nib.save(img1, 'img1.nii')
nib.save(img2, 'img2.nii')
# file file file
func('img1.nii', img2='img2.nii', output='output.nii')
assert np.all(nib.load('output.nii').get_data() == expected)
assert np.all(np.asanyarray(nib.load('output.nii').dataobj) == expected)
os.remove('output.nii')
# file file array
result = func('img1.nii', img2='img2.nii', output=wutils.LOAD)['output']
assert np.all(result.get_data() == expected)
assert np.all(np.asanyarray(result.dataobj) == expected)
# file array file
func('img1.nii', img2=img2, output='output.nii')
assert np.all(nib.load('output.nii').get_data() == expected)
assert np.all(np.asanyarray(nib.load('output.nii').dataobj) == expected)
os.remove('output.nii')
# file array array
result = func('img1.nii', img2=img2, output=wutils.LOAD)['output']
assert np.all(result.get_data() == expected)
assert np.all(np.asanyarray(result.dataobj) == expected)
# array file file
func(img1, img2='img2.nii', output='output.nii')
assert np.all(nib.load('output.nii').get_data() == expected)
assert np.all(np.asanyarray(nib.load('output.nii').dataobj) == expected)
os.remove('output.nii')
# array file array
result = func(img1, img2='img2.nii', output=wutils.LOAD)['output']
assert np.all(result.get_data() == expected)
assert np.all(np.asanyarray(result.dataobj) == expected)
# array array file
func(img1, img2=img2, output='output.nii')
assert np.all(nib.load('output.nii').get_data() == expected)
assert np.all(np.asanyarray(nib.load('output.nii').dataobj) == expected)
os.remove('output.nii')
# array array array
result = func(img1, img2=img2, output=wutils.LOAD)['output']
assert np.all(result.get_data() == expected)
assert np.all(np.asanyarray(result.dataobj) == expected)
# in-memory image, file, file
result = func(img3, img2='img2.nii', output='output.nii')
assert np.all(nib.load('output.nii').get_data() == expected)
assert np.all(np.asanyarray(nib.load('output.nii').dataobj) == expected)
os.remove('output.nii')
# fslimage, file, load
......@@ -307,7 +404,55 @@ def test_fileOrImage():
# nib.image, nib.image, load
result = func(img1, img2=img2, output=wutils.LOAD)['output']
assert isinstance(result, nib.nifti1.Nifti1Image)
assert np.all(result.get_data()[:] == expected)
assert np.all(np.asanyarray(result.dataobj)[:] == expected)
# fsl/fslpy!452
def test_fileOrImage_FSLOUTPUTTYPE():
@wutils.fileOrImage('img', 'output')
def func(img, output):
img = np.asanyarray(nib.load(img).dataobj)
# Save without specifying file suffix,
# so the Image object uses $FSLOUTPUTTYPE
fslimage.Image(img * 5, xform=np.eye(4)).save(fslimage.removeExt(output))
fsloutputtypes = [fslimage.FileType.NIFTI,
fslimage.FileType.NIFTI2,
fslimage.FileType.NIFTI_GZ,
fslimage.FileType.NIFTI2_GZ]
for fsloutputtype in fsloutputtypes:
with tempdir.tempdir(), \
mock.patch.dict(os.environ, FSLOUTPUTTYPE=fsloutputtype.name):
img = fslimage.Image(
np.array([[[1, 2], [ 3, 4]]], dtype=np.int32), xform=np.eye(4))
expected = np.array([[[5, 10], [15, 20]]], dtype=np.int32)
imgfname = op.join(os.getcwd(), f'image{fslimage.defaultExt()}')
img.save(imgfname)
result1 = func(imgfname, wutils.LOAD)
result2 = func(img, wutils.LOAD)
r1data = np.asanyarray(result1['output'].dataobj)
r2data = result2['output'].data
# first call will return a nibabel image,
# second a fsl Image
assert np.all(r1data == expected)
assert np.all(r2data == expected)
for inarg in (img, imgfname):
with tempdir.tempdir():
func(inarg, 'output')
assert op.exists(f'output{fslimage.defaultExt()}')
out = fslimage.Image('output')
assert np.all(out.data == expected.data)
def test_fileOrThing_sequence():
......@@ -315,7 +460,7 @@ def test_fileOrThing_sequence():
@wutils.fileOrArray('arrs', 'out')
def func(arrs, out):
if isinstance(arrs, six.string_types):
if isinstance(arrs, str):
arrs = [arrs]
arrs = [np.loadtxt(a) for a in arrs]
......@@ -353,11 +498,58 @@ def test_fileOrThing_sequence():
assert np.all(func(infiles[0], wutils.LOAD)['out'] == inputs[0])
def test_fileOrThing_var_positionals():
@wutils.fileOrArray('output', 'inputs')
def func(output, *inputs):
ins = [np.loadtxt(i) for i in inputs]
res = np.sum(ins, axis=0)
np.savetxt(output, res)
inputs = [np.random.randint(1, 10, (3, 3)) for i in range(4)]
infiles = ['input{}.txt'.format(i) for i in range(len(inputs))]
exp = np.sum(inputs, axis=0)
with tempdir.tempdir():
for ifile, idata in zip(infiles, inputs):
np.savetxt(ifile, idata)
func('result.txt', *inputs)
assert np.all(np.loadtxt('result.txt') == exp)
def test_fileOrText():
@wutils.fileOrText('input', 'output')
def func(input, output):
data = open(input).read()
data = ''.join(['{}{}'.format(c, c) for c in data])
open(output, 'wt').write(data)
with tempdir.tempdir():
data = 'abcdefg'
exp = 'aabbccddeeffgg'
open('input.txt', 'wt').write(data)
func(pathlib.Path('input.txt'), pathlib.Path('output.txt'))
assert open('output.txt').read() == exp
func('abcdefg', pathlib.Path('output.txt'))
assert open('output.txt').read() == exp
assert func('12345', wutils.LOAD).output == '1122334455'
def test_fileOrThing_outprefix():
@wutils.fileOrImage('img', outprefix='output_base')
def basefunc(img, output_base):
img = nib.load(img).get_data()
img = np.asanyarray(nib.load(img).dataobj)
out1 = nib.nifti1.Nifti1Image(img * 5, np.eye(4))
out2 = nib.nifti1.Nifti1Image(img * 10, np.eye(4))
......@@ -367,42 +559,83 @@ def test_fileOrThing_outprefix():
with tempdir.tempdir() as td:
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]]), np.eye(4))
exp1 = img.get_data() * 5
exp2 = img.get_data() * 10
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]], dtype=np.int32),
np.eye(4))
exp1 = np.asanyarray(img.dataobj) * 5
exp2 = np.asanyarray(img.dataobj) * 10
nib.save(img, 'img.nii')
basefunc('img.nii', 'myout')
assert np.all(nib.load('myout_times5.nii.gz') .get_data() == exp1)
assert np.all(nib.load('myout_times10.nii.gz').get_data() == exp2)
assert np.all(np.asanyarray(nib.load('myout_times5.nii.gz') .dataobj) == exp1)
assert np.all(np.asanyarray(nib.load('myout_times10.nii.gz').dataobj) == exp2)
cleardir(td, 'myout*')
basefunc(img, 'myout')
assert np.all(nib.load('myout_times5.nii.gz') .get_data() == exp1)
assert np.all(nib.load('myout_times10.nii.gz').get_data() == exp2)
assert np.all(np.asanyarray(nib.load('myout_times5.nii.gz') .dataobj) == exp1)
assert np.all(np.asanyarray(nib.load('myout_times10.nii.gz').dataobj) == exp2)
cleardir(td, 'myout*')
res = basefunc(img, 'myout', myout_times5=wutils.LOAD)
assert np.all(res['myout_times5'].get_data() == exp1)
assert np.all(np.asanyarray(res['myout_times5'].dataobj) == exp1)
cleardir(td, 'myout*')
res = basefunc(img, 'myout', myout_times10=wutils.LOAD)
assert np.all(res['myout_times10'].get_data() == exp2)
assert np.all(np.asanyarray(res['myout_times10'].dataobj) == exp2)
cleardir(td, 'myout*')
res = basefunc(img, 'myout', myout=wutils.LOAD)
assert np.all(res['myout_times5'] .get_data() == exp1)
assert np.all(res['myout_times10'].get_data() == exp2)
assert np.all(np.asanyarray(res['myout_times5'] .dataobj) == exp1)
assert np.all(np.asanyarray(res['myout_times10'].dataobj) == exp2)
cleardir(td, 'myout*')
def test_fileOrThing_pathlib():
@wutils.fileOrImage('img', 'out')
def basefunc(img, out):
img = np.asanyarray(nib.load(img).dataobj)
outimg = nib.nifti1.Nifti1Image(img * 5, np.eye(4))
nib.save(outimg, out)
with tempdir.tempdir() as td:
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]], dtype=np.int32),
np.eye(4))
exp = np.asanyarray(img.dataobj) * 5
nib.save(img, 'img.nii')
basefunc(pathlib.Path('img.nii'), pathlib.Path('output.nii'))
assert np.all(np.asanyarray(nib.load('output.nii') .dataobj) == exp)
def test_fileOrThing_outprefix_pathlib():
@wutils.fileOrImage('img', outprefix='output_base')
def basefunc(img, output_base):
img = np.asanyarray(nib.load(img).dataobj)
out1 = nib.nifti1.Nifti1Image(img * 5, np.eye(4))
out2 = nib.nifti1.Nifti1Image(img * 10, np.eye(4))
nib.save(out1, '{}_times5.nii.gz' .format(output_base))
nib.save(out2, '{}_times10.nii.gz'.format(output_base))
with tempdir.tempdir() as td:
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]], dtype=np.int32),
np.eye(4))
exp1 = np.asanyarray(img.dataobj) * 5
exp2 = np.asanyarray(img.dataobj) * 10
nib.save(img, 'img.nii')
basefunc(pathlib.Path('img.nii'), pathlib.Path('myout'))
assert np.all(np.asanyarray(nib.load('myout_times5.nii.gz') .dataobj) == exp1)
assert np.all(np.asanyarray(nib.load('myout_times10.nii.gz').dataobj) == exp2)
def test_fileOrThing_outprefix_differentTypes():
@wutils.fileOrImage('img', outprefix='outpref')
def func(img, outpref):
img = nib.load(img)
img = nib.nifti1.Nifti1Image(img.get_data() * 2, np.eye(4))
img = nib.nifti1.Nifti1Image(np.asanyarray(img.dataobj) * 2, np.eye(4))
text = '1234567890'
nib.save(img, '{}_image.nii.gz' .format(outpref))
......@@ -411,24 +644,25 @@ def test_fileOrThing_outprefix_differentTypes():
f.write(text)
with tempdir.tempdir() as td:
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]]), np.eye(4))
expi = img.get_data() * 2
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]], dtype=np.int32),
np.eye(4))
expi = np.asanyarray(img.dataobj) * 2
expt = '1234567890'
func(img, 'myout')
assert np.all(nib.load('myout_image.nii.gz') .get_data() == expi)
assert np.all(np.asanyarray(nib.load('myout_image.nii.gz') .dataobj) == expi)
with open('myout_text.txt', 'rt') as f:
assert f.read().strip() == expt
cleardir(td, 'myout*')
res = func(img, 'myout', myout_image=wutils.LOAD)
assert list(res.keys()) == ['myout_image']
assert np.all(res['myout_image'].get_data() == expi)
assert np.all(np.asanyarray(res['myout_image'].dataobj) == expi)
cleardir(td, 'myout*')
res = func(img, 'myout', myout=wutils.LOAD)
assert list(res.keys()) == ['myout_image']
assert np.all(res['myout_image'].get_data() == expi)
assert np.all(np.asanyarray(res['myout_image'].dataobj) == expi)
cleardir(td, 'myout*')
res = func(img, 'myout', myout_text=wutils.LOAD)
......@@ -445,8 +679,8 @@ def test_fileOrThing_outprefix_directory():
@wutils.fileOrImage('img', outprefix='outpref')
def func(img, outpref):
img = nib.load(img)
img2 = nib.nifti1.Nifti1Image(img.get_data() * 2, np.eye(4))
img4 = nib.nifti1.Nifti1Image(img.get_data() * 4, np.eye(4))
img2 = nib.nifti1.Nifti1Image(np.asanyarray(img.dataobj) * 2, np.eye(4))
img4 = nib.nifti1.Nifti1Image(np.asanyarray(img.dataobj) * 4, np.eye(4))
outdir = op.abspath('{}_imgs'.format(outpref))
......@@ -456,9 +690,10 @@ def test_fileOrThing_outprefix_directory():
nib.save(img4, op.join(outdir, 'img4.nii.gz'))
with tempdir.tempdir() as td:
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]]), np.eye(4))
exp2 = img.get_data() * 2
exp4 = img.get_data() * 4
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]], dtype=np.int32),
np.eye(4))
exp2 = np.asanyarray(img.dataobj) * 2
exp4 = np.asanyarray(img.dataobj) * 4
res = func(img, 'myout')
assert len(res) == 0
......@@ -469,17 +704,17 @@ def test_fileOrThing_outprefix_directory():
res = func(img, 'myout', myout_imgs=wutils.LOAD)
assert len(res) == 2
assert np.all(res[op.join('myout_imgs', 'img2')].get_data() == exp2)
assert np.all(res[op.join('myout_imgs', 'img4')].get_data() == exp4)
assert np.all(np.asanyarray(res[op.join('myout_imgs', 'img2')].dataobj) == exp2)
assert np.all(np.asanyarray(res[op.join('myout_imgs', 'img4')].dataobj) == exp4)
res = func(img, 'myout', **{op.join('myout_imgs', 'img2') : wutils.LOAD})
assert len(res) == 1
assert np.all(res[op.join('myout_imgs', 'img2')].get_data() == exp2)
assert np.all(np.asanyarray(res[op.join('myout_imgs', 'img2')].dataobj) == exp2)
res = func(img, 'myout', **{op.join('myout_imgs', 'img') : wutils.LOAD})
assert len(res) == 2
assert np.all(res[op.join('myout_imgs', 'img2')].get_data() == exp2)
assert np.all(res[op.join('myout_imgs', 'img4')].get_data() == exp4)
assert np.all(np.asanyarray(res[op.join('myout_imgs', 'img2')].dataobj) == exp2)
assert np.all(np.asanyarray(res[op.join('myout_imgs', 'img4')].dataobj) == exp4)
os.mkdir('foo')
res = func(img, op.join('foo', 'myout'))
......@@ -492,8 +727,113 @@ def test_fileOrThing_outprefix_directory():
os.mkdir('foo')
res = func(img, op.join('foo', 'myout'), **{op.join('foo', 'myout') : wutils.LOAD})
assert len(res) == 2
assert np.all(res[op.join('foo', 'myout_imgs', 'img2')].get_data() == exp2)
assert np.all(res[op.join('foo', 'myout_imgs', 'img4')].get_data() == exp4)
assert np.all(np.asanyarray(res[op.join('foo', 'myout_imgs', 'img2')].dataobj) == exp2)
assert np.all(np.asanyarray(res[op.join('foo', 'myout_imgs', 'img4')].dataobj) == exp4)
def test_fileOrThing_results():
@wutils.fileOrArray('input', 'regular_output', outprefix='outpref')
def func(input, regular_output, outpref):
input = np.loadtxt(input)
regout = input * 2
prefouts = []
for i in range(3, 6):
prefouts.append(input * i)
np.savetxt(regular_output, regout)
for i, o in enumerate(prefouts):
np.savetxt('{}_{}.txt'.format(outpref, i), o)
return ('return', 'value')
input = np.random.randint(1, 10, (3, 3))
infile = 'input.txt'
exp = [input * i for i in range(2, 6)]
with tempdir.tempdir():
np.savetxt(infile, input)
result = func('input.txt', 'regout.txt', 'outpref')
assert len(result) == 0
assert result.stdout == ('return', 'value')
assert (np.loadtxt('regout.txt') == exp[0]).all()
for i in range(3):
assert (np.loadtxt('outpref_{}.txt'.format(i)) == exp[i+1]).all()
result = func(input, 'regout.txt', 'outpref')
assert len(result) == 0
assert result.stdout == ('return', 'value')
assert (np.loadtxt('regout.txt') == exp[0]).all()
for i in range(3):
assert (np.loadtxt('outpref_{}.txt'.format(i)) == exp[i+1]).all()
result = func(input, wutils.LOAD, 'outpref')
assert len(result) == 1
assert result.stdout == ('return', 'value')
assert (result .regular_output == exp[0]).all()
assert (result['regular_output'] == exp[0]).all()
for i in range(3):
assert (np.loadtxt('outpref_{}.txt'.format(i)) == exp[i+1]).all()
# todo outpref
result = func(input, wutils.LOAD, wutils.LOAD)
assert len(result) == 4
assert result.stdout == ('return', 'value')
assert (result .regular_output == exp[0]).all()
assert (result['regular_output'] == exp[0]).all()
assert (result .outpref_0 == exp[1]).all()
assert (result['outpref_0'] == exp[1]).all()
assert (result .outpref_1 == exp[2]).all()
assert (result['outpref_1'] == exp[2]).all()
assert (result .outpref_2 == exp[3]).all()
assert (result['outpref_2'] == exp[3]).all()
for i in range(3):
assert (np.loadtxt('outpref_{}.txt'.format(i)) == exp[i+1]).all()
result = func(input, wutils.LOAD, wutils.LOAD)
assert len(result) == 4
def test_FileOrThing_invalid_identifiers():
# unlikely to ever happen, but let's test arguments with
# names that are not valid python identifiers
@wutils.fileOrArray('in val', '2out')
def func(**kwargs):
infile = kwargs['in val']
outfile = kwargs['2out']
input = np.loadtxt(infile)
np.savetxt(outfile, input * 2)
return ('return', 'value')
input = np.random.randint(1, 10, (3, 3))
infile = 'input.txt'
exp = input * 2
with tempdir.tempdir():
np.savetxt(infile, input)
res = func(**{'in val' : infile, '2out' : 'output.txt'})
assert res.stdout == ('return', 'value')
assert (np.loadtxt('output.txt') == exp).all()
res = func(**{'in val' : input, '2out' : 'output.txt'})
assert res.stdout == ('return', 'value')
assert (np.loadtxt('output.txt') == exp).all()
res = func(**{'in val' : input, '2out' : wutils.LOAD})
assert res.stdout == ('return', 'value')
assert (res['2out'] == exp).all()
def test_chained_fileOrImageAndArray():
......@@ -503,15 +843,16 @@ def test_chained_fileOrImageAndArray():
image = nib.load(image)
array = np.loadtxt(array)
outimg = nib.nifti1.Nifti1Image(image.get_data() * 2, np.eye(4))
outimg = nib.nifti1.Nifti1Image(np.asanyarray(image.dataobj) * 2, np.eye(4))
np.savetxt(outarray, array * 2)
outimg.to_filename(outimage)
image = nib.nifti1.Nifti1Image(np.array([[1, 2], [ 3, 4]]), np.eye(4))
image = nib.nifti1.Nifti1Image(np.array([[1, 2], [ 3, 4]], dtype=np.int32),
np.eye(4))
array = np.array([[5, 6, 7, 8]])
expimg = nib.nifti1.Nifti1Image(image.get_data() * 2, np.eye(4))
expimg = nib.nifti1.Nifti1Image(np.asanyarray(image.dataobj) * 2, np.eye(4))
exparr = array * 2
with tempdir.tempdir():
......@@ -520,31 +861,31 @@ def test_chained_fileOrImageAndArray():
np.savetxt('array.txt', array)
func('image.nii', 'array.txt', 'outimg.nii', 'outarr.txt')
assert np.all(nib.load('outimg.nii').get_data() == expimg.get_data())
assert np.all(np.asanyarray(nib.load('outimg.nii').dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(np.loadtxt('outarr.txt') == exparr)
func('image.nii', array, 'outimg.nii', 'outarr.txt')
assert np.all(nib.load('outimg.nii').get_data() == expimg.get_data())
assert np.all(np.asanyarray(nib.load('outimg.nii').dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(np.loadtxt('outarr.txt') == exparr)
func( image, 'array.txt', 'outimg.nii', 'outarr.txt')
assert np.all(nib.load('outimg.nii').get_data() == expimg.get_data())
assert np.all(np.asanyarray(nib.load('outimg.nii').dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(np.loadtxt('outarr.txt') == exparr)
func( image, array, 'outimg.nii', 'outarr.txt')
assert np.all(nib.load('outimg.nii').get_data() == expimg.get_data())
assert np.all(np.asanyarray(nib.load('outimg.nii').dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(np.loadtxt('outarr.txt') == exparr)
res = func(image, array, wutils.LOAD, 'outarr.txt')
assert np.all(res['outimage'].get_data() == expimg.get_data())
assert np.all(np.asanyarray(res['outimage'].dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(np.loadtxt('outarr.txt') == exparr)
res = func(image, array, 'outimg.nii', wutils.LOAD)
assert np.all(nib.load('outimg.nii').get_data() == expimg.get_data())
assert np.all(np.asanyarray(nib.load('outimg.nii').dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(res['outarray'] == exparr)
res = func(image, array, wutils.LOAD, wutils.LOAD)
assert np.all(res['outimage'].get_data() == expimg.get_data())
assert np.all(np.asanyarray(res['outimage'].dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(res['outarray'] == exparr)
......@@ -561,29 +902,66 @@ def test_fileOrThing_chained_outprefix():
image = nib.load(image)
array = np.loadtxt(array)
outimg = nib.nifti1.Nifti1Image(image.get_data() * 2, np.eye(4))
outimg = nib.nifti1.Nifti1Image(np.asanyarray(image.dataobj) * 2, np.eye(4))
outarr = array * 2
np.savetxt('{}_array.txt'.format(out), outarr)
outimg.to_filename('{}_image.nii'.format(out))
image = nib.nifti1.Nifti1Image(np.array([[1, 2], [ 3, 4]]), np.eye(4))
image = nib.nifti1.Nifti1Image(np.array([[1, 2], [ 3, 4]], dtype=np.int32),
np.eye(4))
array = np.array([[5, 6, 7, 8]])
expimg = nib.nifti1.Nifti1Image(image.get_data() * 2, np.eye(4))
expimg = nib.nifti1.Nifti1Image(np.asanyarray(image.dataobj) * 2, np.eye(4))
exparr = array * 2
with tempdir.tempdir():
func(image, array, 'myout')
assert np.all(nib.load('myout_image.nii').get_data() == expimg.get_data())
assert np.all(np.asanyarray(nib.load('myout_image.nii').dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(np.loadtxt('myout_array.txt') == exparr)
res = func(image, array, wutils.LOAD)
assert np.all(res['out_image'].get_data() == expimg.get_data())
assert np.all(np.asanyarray(res['out_image'].dataobj) == np.asanyarray(expimg.dataobj))
assert np.all(res['out_array'] == exparr)
def test_fileOrThing_submit_cmdonly():
@wutils.fileOrImage('input', 'output')
def func(input, output, submit=False, cmdonly=False):
if submit:
return 'submitted!'
if cmdonly:
return 'cmdonly!'
img = nib.load(input)
img = nib.nifti1.Nifti1Image(np.asanyarray(img.dataobj) * 2, np.eye(4))
nib.save(img, output)
with tempdir.tempdir() as td:
img = nib.nifti1.Nifti1Image(np.array([[1, 2], [3, 4]], dtype=np.int32),
np.eye(4))
exp = np.asanyarray(img.dataobj) * 2
nib.save(img, 'input.nii.gz')
result = func(img, wutils.LOAD)
assert np.all(np.asanyarray(result['output'].dataobj) == exp)
assert func('input.nii.gz', 'output.nii.gz', submit=True) == 'submitted!'
assert func('input.nii.gz', 'output.nii.gz', cmdonly=True) == 'cmdonly!'
# submit cannot work with in-memory images
with pytest.raises(ValueError):
func(img, wutils.LOAD, submit=True)
with pytest.raises(ValueError):
func(img, 'output.nii.gz', submit=True)
with pytest.raises(ValueError):
func('input.nii.gz', wutils.LOAD, submit=True)
def test_cmdwrapper():
@wutils.cmdwrapper
def func(a, b):
......@@ -592,15 +970,20 @@ def test_cmdwrapper():
with run.dryrun():
assert func(1, 2)[0] == 'func 1 2'
assert func(1, 2, cmdonly=True) == ['func', '1', '2']
def test_fslwrapper():
@wutils.fslwrapper
def func(a, b):
return ['func', str(a), str(b)]
with run.dryrun(), mockFSLDIR() as fsldir:
with mockFSLDIR(bin=('func',)) as fsldir:
expected = '{} 1 2'.format(op.join(fsldir, 'bin', 'func'))
assert func(1, 2)[0] == expected
with run.dryrun():
assert func(1, 2)[0] == expected
assert func(1, 2, cmdonly=True) == list(shlex.split(expected))
_test_script = textwrap.dedent("""
......@@ -611,6 +994,10 @@ exit 0
def _test_script_func(a, b):
# assertions should be disabled when
# a cmdwrapper function is called
# with cmdonly=True or submit=True
asrt.assertFileExists('some_file')
return ['test_script', str(a), str(b)]
......@@ -621,7 +1008,7 @@ def test_cmdwrapper_submit():
newpath = op.pathsep.join(('.', os.environ['PATH']))
with tempdir.tempdir(), \
mock.patch('fsl.utils.fslsub.submit', mock_submit), \
mock.patch('fsl.wrappers.fsl_sub', mock_fsl_sub), \
mock.patch.dict(os.environ, {'PATH' : newpath}):
with open('test_script', 'wt') as f:
......@@ -630,7 +1017,7 @@ def test_cmdwrapper_submit():
jid = test_func(1, 2, submit=True)
assert jid == ('12345',)
assert jid == '12345'
stdout, stderr = fslsub.output('12345')
......@@ -644,7 +1031,7 @@ def test_fslwrapper_submit():
test_func = wutils.fslwrapper(_test_script_func)
with mockFSLDIR() as fsldir, \
mock.patch('fsl.utils.fslsub.submit', mock_submit):
mock.patch('fsl.wrappers.fsl_sub', mock_fsl_sub):
test_file = op.join(fsldir, 'bin', 'test_script')
......@@ -654,7 +1041,7 @@ def test_fslwrapper_submit():
jid = test_func(1, 2, submit=True)
assert jid == ('12345',)
assert jid == '12345'
stdout, stderr = fslsub.output('12345')
......@@ -665,7 +1052,7 @@ def test_fslwrapper_submit():
jid = test_func(1, 2, submit=kwargs)
assert jid == ('12345',)
assert jid == '12345'
stdout, stderr = fslsub.output('12345')
......@@ -674,3 +1061,97 @@ def test_fslwrapper_submit():
assert stdout.strip() == 'test_script running: 1 2'
assert stderr.strip() == experr
@pytest.mark.unixtest
def test_cmdwrapper_fileorthing_cmdonly():
test_func = wutils.fileOrImage('a')(wutils.cmdwrapper(_test_script_func))
newpath = op.pathsep.join(('.', os.environ['PATH']))
with tempdir.tempdir(), \
mock.patch.dict(os.environ, {'PATH' : newpath}):
with open('test_script', 'wt') as f:
f.write(_test_script)
os.chmod('test_script', 0o755)
touch('some_file')
ran = test_func('1', '2')
os.remove('some_file')
cmd = test_func('1', '2', cmdonly=True)
assert ran.stdout[0].strip() == 'test_script running: 1 2'
assert cmd == ['test_script', '1', '2']
def test_cmdwrapper_cmdonly_assert():
@wutils.cmdwrapper
def func():
asrt.assertFileExists('file')
return ['echo', 'hello']
with tempdir.tempdir():
with pytest.raises(AssertionError):
func()
touch('file')
assert func()[0].strip() == 'hello'
os.remove('file')
assert func(cmdonly=True) == ['echo', 'hello']
def test_fileOrArray_all_tempfiles_cleared():
tempfiles = []
@wutils.fileOrArray('in_', 'out')
def array(in_, out):
tempfiles.extend((in_, out))
i = np.loadtxt(in_)
i = i + 1
np.savetxt(out, i)
arr = np.array([[1, 2], [ 3, 4]])
arrout = array(arr, wutils.LOAD).out
assert np.all(np.isclose(arrout, arr + 1))
assert len(tempfiles) == 2
assert not any(op.exists(f) for f in tempfiles)
def test_fileOrImage_all_tempfiles_cleared():
tempfiles = []
@wutils.fileOrImage('in_', 'out')
def image(in_, out):
tempfiles.extend((in_, out))
i = nib.load(in_)
i = nib.Nifti1Image(i.get_fdata() + 1, np.eye(4))
i.to_filename(out)
arr = np.array([[1, 2], [ 3, 4]], dtype=np.int32)
img = nib.nifti1.Nifti1Image(arr, np.eye(4))
imgout = image(img, wutils.LOAD).out
assert np.all(np.isclose(imgout.get_fdata(), img.get_fdata() + 1))
assert len(tempfiles) == 2
assert not any(op.exists(f) for f in tempfiles)
def test_fileOrText_all_tempfiles_cleared():
tempfiles = []
@wutils.fileOrText('in_', 'out')
def text(in_, out):
tempfiles.extend((in_, out))
with open(in_, 'rt') as inf, open(out, 'wt') as outf:
outf.write(inf.read())
outf.write('456')
txt = '123'
txtout = text( txt, wutils.LOAD).out
assert txtout == '123456'
assert len(tempfiles) == 2
assert not any(op.exists(f) for f in tempfiles)
File added