Commit f4632295 authored by William Clarke's avatar William Clarke
Browse files

More intermediate commits.

parent 88d0428b
#!/usr/bin/env python
# filtering.py - Routines for FID filtering
#
# Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
# William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
import numpy as np
def apodize(FID,dwelltime,broadening,filter='exp'):
def apodize(FID, dwelltime, broadening, filter='exp'):
""" Apodize FID
Args:
FID (ndarray): Time domain data
dwelltime (float): dwelltime in seconds
......@@ -22,53 +21,64 @@ def apodize(FID,dwelltime,broadening,filter='exp'):
Returns:
FID (ndarray): Apodised FID
"""
taxis = np.linspace(0,dwelltime*(FID.size-1),FID.size)
if filter=='exp':
Tl = 1/broadening[0]
window = np.exp(-taxis/Tl)
elif filter=='l2g':
Tl = 1/broadening[0]
Tg = 1/broadening[1]
window = np.exp(taxis/Tl)*np.exp(taxis**2/Tg**2)
return window*FID
def apodize_report(inFID,outFID,hdr,plotlim = (0.2,6),html=None):
taxis = np.linspace(0, dwelltime * (FID.size - 1), FID.size)
if filter == 'exp':
Tl = 1 / broadening[0]
window = np.exp(-taxis / Tl)
elif filter == 'l2g':
Tl = 1 / broadening[0]
Tg = 1 / broadening[1]
window = np.exp(taxis / Tl) * np.exp(taxis**2 / Tg**2)
else:
print('Filter not recognised, should be "exp" or "l2g".')
window = 1
return window * FID
def apodize_report(inFID,
outFID,
bw,
cf,
nucleus='1H',
plotlim=(0.2, 6),
html=None):
"""
Generate report
"""
# from matplotlib import pyplot as plt
from fsl_mrs.core import MRS
import plotly.graph_objects as go
from fsl_mrs.utils.preproc.reporting import plotStyles,plotAxesStyle
import plotly.graph_objects as go
from fsl_mrs.utils.preproc.reporting import plotStyles, plotAxesStyle
# Turn input FIDs into mrs objects
toMRSobj = lambda fid : MRS(FID=fid,header=hdr)
def toMRSobj(fid):
return MRS(FID=fid, cf=cf, bw=bw, nucleus=nucleus)
plotIn = toMRSobj(inFID)
plotOut = toMRSobj(outFID)
# Fetch line styles
lines,colors,_ = plotStyles()
lines, colors, _ = plotStyles()
# Make a new figure
fig = go.Figure()
# Add lines to figure
def addline(fig,mrs,lim,name,linestyle):
def addline(fig, mrs, lim, name, linestyle):
trace = go.Scatter(x=mrs.getAxes(ppmlim=lim),
y=np.real(mrs.get_spec(ppmlim=lim)),
mode='lines',
name=name,
line=linestyle)
return fig.add_trace(trace)
fig = addline(fig,plotIn,plotlim,'Uncorrected',lines['in'])
fig = addline(fig,plotOut,plotlim,'Corrected',lines['out'])
y=np.real(mrs.get_spec(ppmlim=lim)),
mode='lines',
name=name,
line=linestyle)
return fig.add_trace(trace)
fig = addline(fig, plotIn, plotlim, 'Uncorrected', lines['in'])
fig = addline(fig, plotOut, plotlim, 'Corrected', lines['out'])
# Axes layout
plotAxesStyle(fig,plotlim,title = 'Apodization summary')
# Generate report
plotAxesStyle(fig, plotlim, title='Apodization summary')
# Generate report
if html is not None:
from plotly.offline import plot
from fsl_mrs.utils.preproc.reporting import figgroup, singleReport
......@@ -76,26 +86,26 @@ def apodize_report(inFID,outFID,hdr,plotlim = (0.2,6),html=None):
import os.path as op
if op.isdir(html):
filename = 'report_' + datetime.now().strftime("%Y%m%d_%H%M%S%f")[:-3]+'.html'
htmlfile=op.join(html,filename)
elif op.isdir(op.dirname(html)) and op.splitext(html)[1]=='.html':
filename = 'report_' + datetime.now().strftime("%Y%m%d_%H%M%S%f")[:-3] + '.html'
htmlfile = op.join(html, filename)
elif op.isdir(op.dirname(html)) and op.splitext(html)[1] == '.html':
htmlfile = html
else:
raise ValueError('Report html path must be file or directory. ')
opName = 'Apodization'
timestr = datetime.now().strftime("%H:%M:%S")
datestr = datetime.now().strftime("%d/%m/%Y")
headerinfo = 'Report for fsl_mrs.utils.preproc.filtering.apodize.\n'\
+ f'Generated at {timestr} on {datestr}.'
+ f'Generated at {timestr} on {datestr}.'
# Figures
div = plot(fig, output_type='div',include_plotlyjs='cdn')
figurelist = [figgroup(fig = div,
name= '',
foretext= f'Apodization of spectra.',
afttext= f'')]
div = plot(fig, output_type='div', include_plotlyjs='cdn')
figurelist = [figgroup(fig=div,
name='',
foretext='Apodization of spectra.',
afttext='')]
singleReport(htmlfile,opName,headerinfo,figurelist)
singleReport(htmlfile, opName, headerinfo, figurelist)
return fig
else:
return fig
......@@ -7,7 +7,7 @@ Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
Copyright (C) 2021 University of Oxford
SHBASECOPYRIGHT'''
from fsl_mrs.utils import preproc
from numpy import abs, zeros
import numpy as np
from fsl_mrs.core import NIFTI_MRS
......@@ -15,6 +15,10 @@ class DimensionsDoNotMatch(Exception):
pass
class OnlySVS(Exception):
pass
def first_index(idx):
return all([ii == slice(None, None, None) or ii == 0 for ii in idx])
......@@ -287,9 +291,9 @@ def tshift(data, tshiftStart=0.0, tshiftEnd=0.0, samples=None, report=None, repo
new_shape = list(data.shape)
new_shape[3] = samples
shifted_obj = NIFTI_MRS(
zeros(new_shape, dtype=data.dtype),
np.zeros(new_shape, dtype=data.dtype),
header=data.header)
for dd, idx in data.iterate_over_dims(iterate_over_space=True):
shifted_obj[idx], newDT = preproc.timeshift(dd,
data.dwelltime,
......@@ -333,18 +337,18 @@ def truncate_or_pad(data, npoints, position, report=None, report_all=False):
new_shape = list(data.shape)
new_shape[3] += npoints
trunc_obj = NIFTI_MRS(
zeros(new_shape, dtype=data.dtype),
np.zeros(new_shape, dtype=data.dtype),
header=data.header)
for dd, idx in data.iterate_over_dims(iterate_over_space=True):
if npoints > 0:
trunc_obj[idx] = preproc.pad(dd,
abs(npoints),
np.abs(npoints),
position)
rep_func = 'pad'
elif npoints < 0:
trunc_obj[idx] = preproc.truncate(dd,
abs(npoints),
np.abs(npoints),
position)
rep_func = 'truncate'
else:
......@@ -363,3 +367,160 @@ def truncate_or_pad(data, npoints, position, report=None, report_all=False):
html=report,
function=rep_func)
return trunc_obj
def apodize(data, amount, filter='exp', report=None, report_all=False):
'''Apodize FIDs using a exponential or Lorentzian to Gaussian filter.
Lorentzian to Gaussian filter takes requires two window parameters (t_L and t_G)
:param NIFTI_MRS data: Data to truncate or pad
:param tuple amount: If filter='exp' single valued. If filter='l2g' then two valued.
:param str filter: 'exp' or 'l2g'. Choose exponential or Lorentzian to Gaussian filter
:param report: Provide output location as path to generate report
:param report_all: True to output all indicies
:return: Filtered data in NIFTI_MRS format.
'''
apod_obj = data.copy()
for dd, idx in data.iterate_over_dims(iterate_over_space=True):
apod_obj[idx] = preproc.apodize(dd,
data.dwelltime,
amount,
filter=filter)
if report and (report_all or first_index(idx)):
from fsl_mrs.utils.preproc.filtering import apodize_report
apodize_report(dd,
apod_obj[idx],
data.bandwidth,
data.spectrometer_frequency[0],
nucleus=data.nucleus[0],
html=report)
return apod_obj
def fshift(data, amount, report=None, report_all=False):
'''Apply frequency shift
:param NIFTI_MRS data: Data to truncate or pad
:param float amount: Shift amount in Hz
:param report: Provide output location as path to generate report
:param report_all: True to output all indicies
:return: Shifted data in NIFTI_MRS format.
'''
shift_obj = data.copy()
for dd, idx in data.iterate_over_dims(iterate_over_space=True):
shift_obj[idx] = preproc.freqshift(dd,
data.dwelltime,
amount)
if report and (report_all or first_index(idx)):
from fsl_mrs.utils.preproc.shifting import shift_report
original_hdr = {'bandwidth': data.bandwidth,
'centralFrequency': data.spectrometer_frequency[0],
'ResonantNucleus': data.nucleus[0]}
shift_report(dd,
shift_obj[idx],
original_hdr,
original_hdr,
html=report,
function='freqshift')
return shift_obj
def shift_to_reference(data, ppm_ref, peak_search, report=None, report_all=False):
'''Shift peak to known reference
:param NIFTI_MRS data: Data to truncate or pad
:param float ppm_ref: Reference shift that peak will be moved to
:param report: Provide output location as path to generate report
:param report_all: True to output all indicies
:return: Shifted data in NIFTI_MRS format.
'''
shift_obj = data.copy()
for dd, idx in data.iterate_over_dims(iterate_over_space=True):
shift_obj[idx], _ = preproc.shiftToRef(dd,
ppm_ref,
data.bandwidth,
data.spectrometer_frequency[0],
nucleus=data.nucleus[0],
ppmlim=peak_search)
if report and (report_all or first_index(idx)):
from fsl_mrs.utils.preproc.shifting import shift_report
original_hdr = {'bandwidth': data.bandwidth,
'centralFrequency': data.spectrometer_frequency[0],
'ResonantNucleus': data.nucleus[0]}
shift_report(dd,
shift_obj[idx],
original_hdr,
original_hdr,
html=report,
function='shiftToRef')
return shift_obj
def remove_unlike(data, ppmlim=None, sdlimit=1.96, niter=2, report=None):
'''Remove unlike dynamics
:param NIFTI_MRS data: Data to truncate or pad
:param report: Provide output location as path to generate report
:return: Shifted data in NIFTI_MRS format.
'''
if data.shape[:3] != (1, 1, 1):
raise OnlySVS("remove_unlike only specified for SVS data")
if data.ndim > 5:
raise ValueError('remove_unlike only makes sense for a single dynamic dimension. Combined coils etc. first')
elif data.ndim < 4:
raise ValueError('remove_unlike only makes sense for data with a dynamic dimension')
goodFIDs, badFIDs, gIndicies, bIndicies, metric = \
preproc.identifyUnlikeFIDs(data[0, 0, 0, :, :].T,
data.bandwidth,
data.spectrometer_frequency[0],
nucleus=data.nucleus[0],
ppmlim=ppmlim,
sdlimit=sdlimit,
iterations=niter,
shift=True)
if report:
from fsl_mrs.utils.preproc.unlike import identifyUnlikeFIDs_report
identifyUnlikeFIDs_report(goodFIDs,
badFIDs,
gIndicies,
bIndicies,
metric,
data.bandwidth,
data.spectrometer_frequency[0],
nucleus=data.nucleus[0],
ppmlim=ppmlim,
sdlimit=sdlimit,
html=report)
goodFIDs = np.asarray(goodFIDs).T
goodFIDs = goodFIDs.reshape([1, 1, 1] + list(goodFIDs.shape))
# Conjugation here as it doesn't use the __setitem__ method
good_out = NIFTI_MRS(
goodFIDs.conj(),
header=data.header)
if len(badFIDs) > 0:
badFIDs = np.asarray(badFIDs).T
badFIDs = badFIDs.reshape([1, 1, 1] + list(badFIDs.shape))
bad_out = NIFTI_MRS(
badFIDs.conj(),
header=data.header)
else:
bad_out = None
return good_out, bad_out
......@@ -39,7 +39,7 @@ def timeshift(FID, dwelltime, shiftstart, shiftend, samples=None):
def freqshift(FID, dwelltime, shift):
""" Shift data on frequency axis
Args:
FID (ndarray): Time domain data
dwelltime (float): dwelltime in seconds
......@@ -48,32 +48,51 @@ def freqshift(FID, dwelltime, shift):
Returns:
FID (ndarray): Shifted FID
"""
tAxis = np.linspace(0,dwelltime*FID.size,FID.size)
phaseRamp = 2*np.pi*tAxis*shift
FID = FID * np.exp(1j*phaseRamp)
tAxis = np.linspace(0, dwelltime * FID.size, FID.size)
phaseRamp = 2 * np.pi * tAxis * shift
FID = FID * np.exp(1j * phaseRamp)
return FID
def shiftToRef(FID,target,bw,cf,ppmlim=(2.8,3.2),shift=True):
#Find maximum of absolute spectrum in ppm limit
padFID = pad(FID,FID.size*3)
MRSargs = {'FID':padFID,'bw':bw,'cf':cf}
def shiftToRef(FID, target, bw, cf, nucleus='1H', ppmlim=(2.8, 3.2), shift=True):
'''Find a maximum and shift that maximum to a reference position.
:param FID: FID
:param float target: reference position in ppm
:param float bw: Bandwidth or spectral width in Hz.
:param float cf: Central or spectrometer frequency (MHz)
:param str nucleus: Nucleus string, defaults to 1H
:param ppmlim: Search range for peak maximum
:param bool shift: If True (default) ppm values include shift
:return: Shifted FID
:return: Shifted amount in ppm
'''
# Find maximum of absolute spectrum in ppm limit
padFID = pad(FID, FID.size * 3)
MRSargs = {'FID': padFID,
'bw': bw,
'cf': cf,
'nucleus': nucleus}
mrs = MRS(**MRSargs)
spec = extract_spectrum(mrs,padFID,ppmlim=ppmlim,shift=shift)
spec = extract_spectrum(mrs, padFID, ppmlim=ppmlim, shift=shift)
if shift:
extractedAxis = mrs.getAxes(ppmlim=ppmlim)
else:
extractedAxis = mrs.getAxes(ppmlim=ppmlim,axis='ppm')
else:
extractedAxis = mrs.getAxes(ppmlim=ppmlim, axis='ppm')
maxIndex = np.argmax(np.abs(spec))
shiftAmount = extractedAxis[maxIndex]-target
shiftAmountHz = shiftAmount * mrs.centralFrequency/1E6
shiftAmount = extractedAxis[maxIndex] - target
shiftAmountHz = shiftAmount * mrs.centralFrequency / 1E6
return freqshift(FID,1/bw,-shiftAmountHz),shiftAmount
return freqshift(FID, 1 / bw, -shiftAmountHz), shiftAmount
def truncate(FID,k,first_or_last='last'):
def truncate(FID, k, first_or_last='last'):
"""
Truncate parts of a FID
Parameters:
-----------
FID : array-like
......@@ -85,7 +104,7 @@ def truncate(FID,k,first_or_last='last'):
array-like
"""
FID_trunc = FID.copy()
if first_or_last == 'first':
return FID_trunc[k:]
elif first_or_last == 'last':
......@@ -93,10 +112,11 @@ def truncate(FID,k,first_or_last='last'):
else:
raise(Exception("Last parameter must either be 'first' or 'last'"))
def pad(FID,k,first_or_last='last'):
def pad(FID, k, first_or_last='last'):
"""
Pad parts of a FID
Parameters:
-----------
FID : array-like
......@@ -108,11 +128,11 @@ def pad(FID,k,first_or_last='last'):
array-like
"""
FID_pad = FID.copy()
if first_or_last == 'first':
return np.pad(FID_pad,(k,0))
return np.pad(FID_pad, (k, 0))
elif first_or_last == 'last':
return np.pad(FID_pad,(0,k))
return np.pad(FID_pad, (0, k))
else:
raise(Exception("Last parameter must either be 'first' or 'last'"))
......
#!/usr/bin/env python
# unlike.py - Outlier detection routines
#
# Author: William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
from fsl_mrs.core import MRS
import numpy as np
from fsl_mrs.utils.preproc.general import get_target_FID
from fsl_mrs.utils.misc import extract_spectrum,FIDToSpec
from fsl_mrs.utils.misc import extract_spectrum, FIDToSpec
def identifyUnlikeFIDs(FIDList,bandwidth,centralFrequency,sdlimit = 1.96,iterations=2,ppmlim=None,shift=True):
def identifyUnlikeFIDs(FIDList,
bandwidth,
centralFrequency,
nucleus='1H',
sdlimit=1.96,
iterations=2,
ppmlim=None,
shift=True):
""" Identify FIDs in a list that are unlike the others
Args:
FIDList (list of ndarray): Time domain data
bandwidth (float) : Bandwidth in Hz
centralFrequency (float) : Central frequency in Hz
sdlimit (float,optional) : Exclusion limit (number of stnadard deviations). Default = 3.
sdlimit (float,optional) : Exclusion limit (number of standard deviations). Default = 3.
iterations (int,optional): Number of iterations to use.
ppmlim (tuple,optional) : Limit to this ppm range
shift (bool,optional) : Apply H20 shft
......@@ -32,66 +38,79 @@ def identifyUnlikeFIDs(FIDList,bandwidth,centralFrequency,sdlimit = 1.96,iterati
"""
# Calculate the FID to compare to
target = get_target_FID(FIDList,target='median')
target = get_target_FID(FIDList, target='median')
if ppmlim is not None:
MRSargs = {'FID':target,'bw':bandwidth,'cf':centralFrequency}
MRSargs = {'FID': target,
'bw': bandwidth,
'cf': centralFrequency,
'nucleus': nucleus}
mrs = MRS(**MRSargs)
target = extract_spectrum(mrs,target,ppmlim=ppmlim,shift=shift)
compareList = [extract_spectrum(mrs,f,ppmlim=ppmlim,shift=shift) for f in FIDList]
target = extract_spectrum(mrs, target, ppmlim=ppmlim, shift=shift)
compareList = [extract_spectrum(mrs, f, ppmlim=ppmlim, shift=shift) for f in FIDList]
else:
compareList = [FIDToSpec(f) for f in FIDList]
target = FIDToSpec(target)
# Do the comparison
# Do the comparison
for idx in range(iterations):
metric = []
for data in compareList:
metric.append(np.linalg.norm(data-target))
metric.append(np.linalg.norm(data - target))
metric = np.asarray(metric)
metric_avg = np.mean(metric)
metric_std = np.std(metric)
goodFIDs,badFIDs,rmIndicies,keepIndicies = [],[],[],[]
for iDx,(data,m) in enumerate(zip(FIDList,metric)):
if m > ((sdlimit*metric_std)+metric_avg) or m < (-(sdlimit*metric_std)+metric_avg):
goodFIDs, badFIDs, rmIndicies, keepIndicies = [], [], [], []
for iDx, (data, m) in enumerate(zip(FIDList, metric)):
if m > ((sdlimit * metric_std) + metric_avg) or m < (-(sdlimit * metric_std) + metric_avg):
badFIDs.append(data)
rmIndicies.append(iDx)
else:
goodFIDs.append(data)
keepIndicies.append(iDx)
target = get_target_FID(goodFIDs,target='median')
target = get_target_FID(goodFIDs, target='median')
if ppmlim is not None:
target = extract_spectrum(mrs,target,ppmlim=ppmlim,shift=shift)
target = extract_spectrum(mrs, target, ppmlim=ppmlim, shift=shift)
else:
target = FIDToSpec(target)
return goodFIDs,badFIDs,keepIndicies,rmIndicies,metric.tolist()
return goodFIDs, badFIDs, keepIndicies, rmIndicies, metric.tolist()
def identifyUnlikeFIDs_report(goodFIDs,badFIDs,hdr,keepIndicies,rmIndicies,metric,ppmlim=(0.2,4.2),sdlimit = 1.96,html=None):
from fsl_mrs.utils.preproc.combine import combine_FIDs
import plotly.graph_objects as go
from fsl_mrs.utils.preproc.reporting import plotStyles,plotAxesStyle
def identifyUnlikeFIDs_report(goodFIDs,
badFIDs,
keepIndicies,
rmIndicies,
metric,
bw,
cf,
nucleus='1H',
ppmlim=(0.2, 4.2),
sdlimit=1.96,
html=None):
import plotly.graph_objects as go
from fsl_mrs.utils.preproc.reporting import plotStyles, plotAxesStyle
metricGd = np.array(metric)[keepIndicies]
metricBd = np.array(metric)[rmIndicies]
metricBd = np.array(metric)[rmIndicies]
metric_avg = np.mean(metric)