Commit 7e99549b authored by William Clarke's avatar William Clarke
Browse files

Copyright and flake8 on all tests.

parent 86515805
'''FSL-MRS test script
Test core MRS class.
Copyright Will Clarke, University of Oxford, 2021'''
from pathlib import Path
......
'''FSL-MRS test script
Test core MRSI class.
Copyright Will Clarke, University of Oxford, 2021'''
from fsl_mrs.core import MRSI, mrsi_from_files
from pathlib import Path
from fsl_mrs.utils import mrs_io
......
# Test the NIFTI-MRS class implementation
'''FSL-MRS test script
Test the NIFTI-MRS class implementation
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
from pathlib import Path
......
# Test io functions
'''FSL-MRS test script
Test io functions
Copyright Will Clarke, University of Oxford, 2021'''
import fsl_mrs.utils.mrs_io as mrsio
import fsl_mrs.utils.mrs_io.fsl_io as fslio
from fsl_mrs.utils import plotting
......
# Test the main svs fitting script
'''FSL-MRS test script
Test the main svs fitting script
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
# Test the main svs fitting script
'''FSL-MRS test script
Test the main mrsi fitting script
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
# Test the merge report script
'''FSL-MRS test script
Test the merge report script
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
# Test the visualisation script
'''FSL-MRS test script
Test the visualisation script
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
# Test the mrsi segmentation script
'''FSL-MRS test script
Test the mrsi segmentation script
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
'''FSL-MRS test script
Test the svs preprocessing script
Copyright Will Clarke, University of Oxford, 2021'''
import subprocess
from pathlib import Path
......
# Tests for the individual proc functions.
# These tests don't test theat the actual algorithms are doing the right thing,
# simply that the script handles SVS data and MRSI data properly and that the
# results from the command line program matches that of the underlying
# algorithms in nifti_mrs_proc.py
'''FSL-MRS test script
Tests for the individual proc functions.
These tests don't test theat the actual algorithms are doing the right thing,
simply that the script handles SVS data and MRSI data properly and that the
results from the command line program matches that of the underlying
algorithms in nifti_mrs_proc.py
Copyright Will Clarke, University of Oxford, 2021'''
import pytest
import os.path as op
......
# Test the generation of spectra from fsl_mrs output
'''FSL-MRS test script
Test the generation of spectra from fsl_mrs output
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
'''FSL-MRS test script
Test the simulation script in FSL-MRS
Copyright Will Clarke, University of Oxford, 2021'''
import pytest
import json
import numpy as np
import os.path as op
import subprocess
from fsl_mrs.utils.mrs_io import fsl_io,lcm_io
from fsl_mrs.utils.mrs_io import fsl_io, lcm_io
from fsl_mrs.denmatsim import simseq as sim
seqfile = op.join(op.dirname(__file__),'testdata/sim/example.json')
seqfile = op.join(op.dirname(__file__), 'testdata/sim/example.json')
# construct two simple (1 spin) test spin systems
@pytest.fixture
def spinsys(tmp_path):
spinsystem = {"sys1":{"shifts":[1,2], "j":[[0,0],[0,0]],"scaleFactor":1.0},
"sys2":{"shifts":[3],"j":[[0]],"scaleFactor":3.0}}
with open(op.join(tmp_path,'custom_ss.json'), 'w', encoding='utf-8') as f:
def spinsys(tmp_path):
spinsystem = {"sys1": {"shifts": [1, 2], "j": [[0, 0], [0, 0]], "scaleFactor": 1.0},
"sys2": {"shifts": [3], "j": [[0]], "scaleFactor": 3.0}}
with open(op.join(tmp_path, 'custom_ss.json'), 'w', encoding='utf-8') as f:
json.dump(spinsystem, f, ensure_ascii=False, indent='\t')
zeroRefSS = {"shifts":[0], "j":[[0]],"scaleFactor":1.0}
spinsystem.update({'0ref':zeroRefSS})
zeroRefSS = {"shifts": [0], "j": [[0]], "scaleFactor": 1.0}
spinsystem.update({'0ref': zeroRefSS})
return spinsystem
# Load an example dataset
@pytest.fixture
def seqparams(tmp_path):
with open(seqfile,'r') as seqFile:
def seqparams(tmp_path):
with open(seqfile, 'r') as seqFile:
jsonString = seqFile.read()
seqFileParams = json.loads(jsonString)
return seqFileParams
def test_sim(spinsys,seqparams,tmp_path):
# Test simulation by running on one of the included sequence files and then comparing to a manually simulated data set here.
def test_sim(spinsys, seqparams, tmp_path):
# Test simulation by running on one of the included sequence files
# and then comparing to a manually simulated data set here.
# Run the sequence on a couple of metabolites, ask for all types of files, add the reference peak
ssFile = op.join(tmp_path,'custom_ss.json')
outfile = op.join(tmp_path,'simulated')
subprocess.call(['fsl_mrs_sim','-s',ssFile,'-o',outfile,'-r','-a',seqfile])
ssFile = op.join(tmp_path, 'custom_ss.json')
outfile = op.join(tmp_path, 'simulated')
subprocess.call(['fsl_mrs_sim',
'-s', ssFile,
'-o', outfile,
'-r',
'-a', seqfile])
# Load the data
basis_j,names_j,header = fsl_io.readFSLBasisFiles(outfile)
rawfiles = [op.join(tmp_path,'simulated','sys1.RAW'),op.join(tmp_path,'simulated','sys2.RAW')]
basis_r,names =lcm_io.read_basis_files(rawfiles)
# Check that this matches what is simulated manually
basis_j, names_j, header = fsl_io.readFSLBasisFiles(outfile)
rawfiles = [op.join(tmp_path, 'simulated', 'sys1.RAW'),
op.join(tmp_path, 'simulated', 'sys2.RAW')]
basis_r, names = lcm_io.read_basis_files(rawfiles)
# Check that this matches what is simulated manually
directSim = []
for ss in spinsys:
FID,ax,pmat = sim.simseq(spinsys[ss],seqparams)
print(ss)
FID, ax, pmat = sim.simseq(spinsys[ss], seqparams)
FID *= spinsys[ss]['scaleFactor']
directSim.append(FID.conj())
directSim1 = directSim[0]+directSim[2]
directSim2 = directSim[1]+directSim[2]
assert np.allclose(basis_j[:,names_j.index('sys1')],directSim1)
directSim1 = directSim[0] + directSim[2]
directSim2 = directSim[1] + directSim[2]
assert np.allclose(basis_j[:, names_j.index('sys1')], directSim1)
assert np.allclose(basis_j[:, names_j.index('sys2')], directSim2)
# Test the svs segmentation script
'''FSL-MRS test script
Test the svs segmentation script
Copyright Will Clarke, University of Oxford, 2021'''
# Imports
import subprocess
......
'''FSL-MRS test script
Test baseline tools
Copyright Will Clarke, University of Oxford, 2021'''
from fsl_mrs.utils import baseline
from fsl_mrs.core import MRS
import numpy as np
......
'''FSL-MRS test script
Test the main fitting routines
Copyright Will Clarke, University of Oxford, 2021'''
from fsl_mrs.utils.synthetic import syntheticFID
from fsl_mrs.utils.synthetic.synthetic_from_basis import syntheticFromBasisFile
from fsl_mrs.core import MRS
......@@ -23,6 +29,7 @@ def data():
basisNames = ['Cr', 'PCr', 'NAA']
basisFIDs = []
basisHdr = None
for idx, _ in enumerate(amplitude):
tmp, basisHdr = syntheticFID(noisecovariance=[[0.0]],
chemicalshift=[chemshift[idx]],
......
'''FSL-MRS test script
Test functions that appear in utils.misc module
Copyright Will Clarke, University of Oxford, 2021'''
from fsl_mrs.utils.mrs_io.main import read_FID
from fsl_mrs.utils import misc
from fsl_mrs.utils import synthetic as synth
......
'''FSL-MRS test script
Test utils.preproc functions
Copyright Will Clarke, University of Oxford, 2021'''
import fsl_mrs.utils.preproc as preproc
import fsl_mrs.utils.synthetic as syn
from fsl_mrs.utils.misc import FIDToSpec
from fsl_mrs.core import MRS
import numpy as np
# Test frequency shift by finding max on freq/ppm axis after a shift
def test_freqshift():
testFID,testHdrs = syn.syntheticFID(amplitude=[0.0,1.0]) # Single peak at 3 ppm
testFID, testHdrs = syn.syntheticFID(amplitude=[0.0, 1.0]) # Single peak at 3 ppm
# Shift to 0 ppm
dt = 1/testHdrs['inputopts']['bandwidth']
shift = testHdrs['inputopts']['centralfrequency']*-3.0
shiftedFID = preproc.freqshift(testFID[0],dt,shift)
dt = 1 / testHdrs['inputopts']['bandwidth']
shift = testHdrs['inputopts']['centralfrequency'] * -3.0
shiftedFID = preproc.freqshift(testFID[0], dt, shift)
maxindex = np.argmax(np.abs(FIDToSpec(shiftedFID)))
freqOfMax = testHdrs['faxis'][maxindex]
assert freqOfMax < 5 and freqOfMax > -5
# Test timeshift
def test_timeshift():
# Create data with lots of points and some begin time delay
testFID,testHdrs = syn.syntheticFID(begintime=-0.001,points=4096,noisecovariance=[[0.0]])
testFID2,testHdrs2 = syn.syntheticFID(begintime=0.000,points=4096,noisecovariance=[[0.0]])
testFID, testHdrs = syn.syntheticFID(begintime=-0.001, points=4096, noisecovariance=[[0.0]])
testFID2, testHdrs2 = syn.syntheticFID(begintime=0.000, points=4096, noisecovariance=[[0.0]])
# Reduce points and pad to remove first order phase
shiftedFID,_ = preproc.timeshift(testFID[0],1/testHdrs['inputopts']['bandwidth'],0.001,0.0,samples=4096)
# Reduce points and pad to remove first order phase
shiftedFID, _ = preproc.timeshift(testFID[0],
1 / testHdrs['inputopts']['bandwidth'],
0.001,
0.0,
samples=4096)
# assert shiftedFID.size == 2048
assert np.allclose(shiftedFID,testFID2[0],atol=1E-1)
assert np.allclose(shiftedFID, testFID2[0], atol=1E-1)
# Test combine_FIDs:
# Test mean by calculating mean of anti phase signals
# Test averaging using weights to zero signal
# Test coil combination against analytical Roemer eqn given known coil complex weights.
def test_combine_FIDs():
testFIDs,testHdrs = syn.syntheticFID(noisecovariance=np.zeros((2,2)),coilamps=[1.0,1.0],coilphase=[0.0,np.pi])
testFIDs, testHdrs = syn.syntheticFID(noisecovariance=np.zeros((2, 2)), coilamps=[1.0, 1.0], coilphase=[0.0, np.pi])
combfids = preproc.combine_FIDs(testFIDs,'mean')
combfids = preproc.combine_FIDs(testFIDs, 'mean')
assert np.isclose(combfids,0).all()
assert np.allclose(combfids, 0)
testFIDs,testHdrs = syn.syntheticFID(noisecovariance=np.zeros((4,4)),coilamps=[1.0,1.0,1.0,1.0],coilphase=[0.0,0.0,0.0,0.0])
testFIDs, testHdrs = syn.syntheticFID(noisecovariance=np.zeros((4, 4)),
coilamps=[1.0, 1.0, 1.0, 1.0],
coilphase=[0.0, 0.0, 0.0, 0.0])
weigths = [np.exp(1j*0),np.exp(1j*np.pi/2),np.exp(1j*np.pi),np.exp(1j*3*np.pi/2)]
combfids = preproc.combine_FIDs(testFIDs,'weighted',weights=weigths)
weigths = [np.exp(1j * 0), np.exp(1j * np.pi / 2), np.exp(1j * np.pi), np.exp(1j * 3 * np.pi / 2)]
combfids = preproc.combine_FIDs(testFIDs, 'weighted', weights=weigths)
assert np.isclose(combfids,0).all()
assert np.allclose(combfids, 0)
#Generate high SNR data
# Generate high SNR data
coilvar = 0.001
noiseCov = [[coilvar,0],[0,coilvar]]
coilamps= 0.5+np.random.rand(2)*0.4
coilphs = np.random.rand(2)*np.pi*2
noiseCov = [[coilvar, 0], [0, coilvar]]
coilamps = 0.5 + np.random.rand(2) * 0.4
coilphs = np.random.rand(2) * np.pi * 2
testFIDs,testHdrs = syn.syntheticFID(noisecovariance=noiseCov,coilamps=coilamps,coilphase=coilphs,points =4096)
testFIDs, testHdrs = syn.syntheticFID(noisecovariance=noiseCov,
coilamps=coilamps,
coilphase=coilphs,
points=4096)
invcovMat = coilvar*np.linalg.inv(noiseCov)
invcovMat = coilvar * np.linalg.inv(noiseCov)
analyticalRoemer = []
testFIDs = np.asarray(testFIDs).T
cmplxW = coilamps * np.exp(1j*coilphs)
cmplxW = coilamps * np.exp(1j * coilphs)
for f in testFIDs:
tmp = (f@invcovMat@cmplxW.conj())/np.sqrt(cmplxW.conj()@invcovMat@cmplxW)
tmp = (f @ invcovMat @ cmplxW.conj()) / np.sqrt(cmplxW.conj() @ invcovMat @ cmplxW)
analyticalRoemer.append(tmp)
analyticalRoemer = np.asarray(analyticalRoemer)
combfid = preproc.combine_FIDs(testFIDs,'svd',do_prewhiten=True)
combfid = preproc.combine_FIDs(testFIDs, 'svd', do_prewhiten=True)
# Check only the first few points otherwise the relative tolarence has to be very high.
assert np.allclose(np.abs(combfid[:200]),np.abs(analyticalRoemer[:200]), atol=1E-2, rtol=1E-1)
assert np.allclose(np.abs(combfid[:200]), np.abs(analyticalRoemer[:200]), atol=1E-2, rtol=1E-1)
# Test the alignment by aligning based on two sets of offset peaks with different offsets and measuring combined peak height
# Test the alignment by aligning based on two sets of offset peaks with
# different offsets and measuring combined peak height
def test_phase_freq_align():
peak1Shift = np.random.rand(10)*0.1
peak1Phs = np.random.randn(10)*2*np.pi
peak1Shift = np.random.rand(10) * 0.1
peak1Phs = np.random.randn(10) * 2 * np.pi
shiftedFIDs = []
for s,p in zip(peak1Shift,peak1Phs):
testFIDs,testHdrs = syn.syntheticFID(amplitude = [1,1],chemicalshift=[-2+s,3],phase=[p,0.0],points=2048,noisecovariance=[[1E-1]])
testHdrs = None
for s, p in zip(peak1Shift, peak1Phs):
testFIDs, testHdrs = syn.syntheticFID(amplitude=[1, 1],
chemicalshift=[-2 + s, 3],
phase=[p, 0.0],
points=2048,
noisecovariance=[[1E-1]])
shiftedFIDs.append(testFIDs[0])
# Align across shifted peak
alignedFIDs,_,_ = preproc.phase_freq_align(shiftedFIDs,testHdrs['bandwidth'],
testHdrs['centralFrequency']*1E6,
niter=2,verbose=False,ppmlim=(-2.2,-1.7),shift=False)
meanFID = preproc.combine_FIDs(alignedFIDs,'mean')
assert np.max(np.abs(FIDToSpec(meanFID)))>0.09
alignedFIDs, _, _ = preproc.phase_freq_align(shiftedFIDs,
testHdrs['bandwidth'],
testHdrs['centralFrequency'] * 1E6,
niter=2,
verbose=False,
ppmlim=(-2.2, -1.7),
shift=False)
meanFID = preproc.combine_FIDs(alignedFIDs, 'mean')
assert np.max(np.abs(FIDToSpec(meanFID))) > 0.09
# Align across fixed peak
alignedFIDs,_,_ = preproc.phase_freq_align(shiftedFIDs,testHdrs['bandwidth'],
testHdrs['centralFrequency']*1E6,
niter=2,verbose=False,ppmlim=(2,4),shift=False)
meanFID = preproc.combine_FIDs(alignedFIDs,'mean')
assert np.max(np.abs(FIDToSpec(meanFID)))>0.09
alignedFIDs, _, _ = preproc.phase_freq_align(shiftedFIDs,
testHdrs['bandwidth'],
testHdrs['centralFrequency'] * 1E6,
niter=2,
verbose=False,
ppmlim=(2, 4),
shift=False)
meanFID = preproc.combine_FIDs(alignedFIDs, 'mean')
assert np.max(np.abs(FIDToSpec(meanFID))) > 0.09
def test_truncate():
testFIDs,testHdrs = syn.syntheticFID()
truncatedFID1 = preproc.truncate(testFIDs[0],1,'first')
assert (testFIDs[0][1:]==truncatedFID1).all()
truncatedFID1 = preproc.truncate(testFIDs[0],1,'last')
assert (testFIDs[0][:-1]==truncatedFID1).all()
testFIDs, testHdrs = syn.syntheticFID()
truncatedFID1 = preproc.truncate(testFIDs[0], 1, 'first')
assert (testFIDs[0][1:] == truncatedFID1).all()
truncatedFID1 = preproc.truncate(testFIDs[0], 1, 'last')
assert (testFIDs[0][:-1] == truncatedFID1).all()
def test_pad():
testFIDs,testHdrs = syn.syntheticFID()
paddedFID1 = preproc.pad(testFIDs[0],1,'first')
testFIDs, testHdrs = syn.syntheticFID()
paddedFID1 = preproc.pad(testFIDs[0], 1, 'first')
assert paddedFID1[0] == 0.0
paddedFID1 = preproc.pad(testFIDs[0],1,'last')
paddedFID1 = preproc.pad(testFIDs[0], 1, 'last')
assert paddedFID1[-1] == 0.0
def test_apodize():
mockFID = np.full((1024),1.0)
dt = 1/2000
apodFID = preproc.apodize(mockFID,dt,[10])
assert apodFID[-1] == np.exp(-dt*1023*10)
mockFID = np.full((1024), 1.0)
dt = 1 / 2000
apodFID = preproc.apodize(mockFID, dt, [10])
assert apodFID[-1] == np.exp(-dt * 1023 * 10)
assert apodFID[0] == 1.0
apodFID = preproc.apodize(mockFID,dt,[10,0.01],'l2g')
apodFID = preproc.apodize(mockFID, dt, [10, 0.01], 'l2g')
t = dt*1023
Tl = 1/10
Tg = 1/0.01
assert apodFID[-1] == np.exp(t/Tl)*np.exp(t**2/Tg**2)
t = dt * 1023
Tl = 1 / 10
Tg = 1 / 0.01
assert apodFID[-1] == np.exp(t / Tl) * np.exp(t**2 / Tg**2)
assert apodFID[0] == 1.0
# Test hlsvd by removing one peak from a two peak fid
def test_hlsvd():
# low noise
testFIDs,testHdrs = syn.syntheticFID(noisecovariance=[[1E-6]],amplitude=[1.0,1.0],chemicalshift=[-2,0])
limits = [-2.5,-1.5]
testFIDs, testHdrs = syn.syntheticFID(noisecovariance=[[1E-6]], amplitude=[1.0, 1.0], chemicalshift=[-2, 0])
limits = [-2.5, -1.5]
# (FID,dwelltime,centralFrequency,limits,limitUnits = 'ppm',numSingularValues=50)
removedFID = preproc.hlsvd(testFIDs[0],
testHdrs['dwelltime'],
testHdrs['centralFrequency'],
limits,
limitUnits = 'ppm',
numSingularValues=20)
testHdrs['dwelltime'],
testHdrs['centralFrequency'],
limits,
limitUnits='ppm',
numSingularValues=20)
onResFID = np.exp(-testHdrs['inputopts']['damping'][1]*testHdrs['taxis'])
onResFID = np.exp(-testHdrs['inputopts']['damping'][1] * testHdrs['taxis'])
assert np.allclose(np.real(removedFID), np.real(onResFID), atol=1E-2, rtol=1E-2)
assert np.isclose(np.real(removedFID),np.real(onResFID),atol=1E-2,rtol=1E-2).all()
def test_pad_truncate():
mockFID = np.full((1024),1.0)
testFID = preproc.pad(mockFID,1,first_or_last='last')
assert testFID.size==1025
assert testFID[-1]==0.0
mockFID = np.full((1024), 1.0)
testFID = preproc.pad(mockFID, 1, first_or_last='last')
assert testFID.size == 1025
assert testFID[-1] == 0.0
mockFID = np.full((1024), 1.0)
testFID = preproc.pad(mockFID, 2, first_or_last='first')
assert testFID.size == 1026
assert testFID[1] == 0.0
mockFID = np.full((1024),1.0)
testFID = preproc.pad(mockFID,2,first_or_last='first')
assert testFID.size==1026
assert testFID[1]==0.0
mockFID = np.full((1024), 1.0)
testFID = preproc.truncate(mockFID, 1, first_or_last='last')
assert testFID.size == 1023
mockFID = np.full((1024),1.0)
testFID = preproc.truncate(mockFID,1,first_or_last='last')
assert testFID.size==1023
mockFID = np.full((1024), 1.0)
testFID = preproc.truncate(mockFID, 2, first_or_last='first')
assert testFID.size == 1022
mockFID = np.full((1024),1.0)
testFID = preproc.truncate(mockFID,2,first_or_last='first')
assert testFID.size==1022
def test_phaseCorrect():
testFIDs,testHdrs = syn.syntheticFID(amplitude=[1.0],chemicalshift=[0.0],phase=[np.pi/2],noisecovariance=[[1E-5]])
corrected,phs,pos=preproc.phaseCorrect(testFIDs[0],testHdrs['bandwidth'],testHdrs['centralFrequency'],ppmlim=(-0.5,0.5),shift=False)
assert np.isclose(phs,-np.pi/2,atol=1E-2)