Commit 578ddbb4 authored by William Clarke's avatar William Clarke
Browse files

First commit of dynamic models. Tests and some improvements needed.

parent c3a4813a
from fsl_mrs.utils.dynamic.variable_mapping import VariableMapping
# variable_mapping.py - Class responsible for variable mapping
#
# Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
# William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
import numpy as np
from scipy.optimize import minimize
from functools import partial
class VariableMapping(object):
def __init__(self,
param_names,
param_sizes,
time_variable,
config_file):
"""
Variable Mapping Class Constructor
Mapping betwee free and mapped:
Mapped = TxN matrix
Mapped[i,j] = float or 1D-array of floats with size param_sizes[j]
Parameters
----------
param_names : list
param_sizes : list
time_variale : array-like
config_file : string
"""
self.time_variable = np.asarray(time_variable)
self.ntimes = self.time_variable.shape[0]
self.mapped_names = param_names
self.mapped_nparams = len(self.mapped_names)
self.mapped_sizes = param_sizes
from runpy import run_path
settings = run_path(config_file)
self.Parameters = settings['Parameters']
for name in self.mapped_names:
if name not in self.Parameters:
self.Parameters[name] = 'fixed'
self.fcns = {}
for key in settings:
if callable(settings[key]):
self.fcns[key] = settings[key]
if 'Bounds' in settings:
self.defined_bounds = settings['Bounds']
self.Bounds = self.create_constraints(settings['Bounds'])
else:
self.defined_bounds = None
self.Bounds = self.create_constraints(None)
self.nfree = self.calc_nfree()
def __str__(self):
OUT = '-----------------------\n'
OUT += 'Variable Mapping Object\n'
OUT += '-----------------------\n'
OUT += f'Number of Mapped param groups = {len(self.mapped_names)}\n'
OUT += f'Number of Mapped params = {sum(self.mapped_sizes)}\n'
OUT += f'Number of Free params = {self.nfree}\n'
OUT += f'Number of params if all indep = {sum(self.mapped_sizes)*self.ntimes}\n'
OUT += 'Dynamic functions\n'
for param_name in self.mapped_names:
beh = self.Parameters[param_name]
OUT += f'{param_name} \t {beh}\n'
return OUT
def __repr__(self) -> str:
return str(self)
def calc_nfree(self):
"""
Calculate number of free parameters based on mapped behaviour
Returns
-------
int
"""
N = 0
for index, param in enumerate(self.mapped_names):
beh = self.Parameters[param]
if (beh == 'fixed'):
N += self.mapped_sizes[index]
elif (beh == 'variable'):
N += self.ntimes * self.mapped_sizes[index]
else:
if 'dynamic' in beh:
N += len(beh['params']) * self.mapped_sizes[index]
return N
def create_constraints(self, bounds):
"""
Create list of constraints to be used in optimization
Parameters:
-----------
bounds : dict {param:bounds}
Returns
-------
list
"""
if bounds is None:
return [(None, None)] * self.calc_nfree()
if not isinstance(bounds, dict):
raise(Exception('Input should either be a dict or None'))
b = [] # list of bounds
for index, name in enumerate(self.mapped_names):
psize = self.mapped_sizes[index]
if (self.Parameters[name] == 'fixed'):
# check if there are bound on this param
if name in bounds:
for s in range(psize):
b.append(bounds[name])
else:
for s in range(psize):
b.append((None, None))
elif (self.Parameters[name] == 'variable'):
for t in range(self.ntimes):
for s in range(psize):
if name in bounds:
b.append(bounds[name])
else:
b.append((None, None))
else:
if 'dynamic' in self.Parameters[name]:
pnames = self.Parameters[name]['params']
for s in range(psize):
for p in pnames:
if p in bounds:
b.append(bounds[p])
else:
b.append((None, None))
return b
def mapped_from_list(self, p):
"""
Converts list of params into Mapped by repeating over time
Parameters
----------
p : list
Returns
-------
2D array
"""
if isinstance(p, list):
p = np.asarray(p)
if (p.ndim == 1):
p = np.repeat(p[None, :], self.ntimes, 0)
return p
def create_free_names(self):
"""
create list of names for free params
Returns
-------
list of strings
"""
names = []
for index, param in enumerate(self.mapped_names):
beh = self.Parameters[param]
if (beh == 'fixed'):
name = [f'{param}_{x}' for x in range(self.mapped_sizes[index])]
names.extend(name)
elif (beh == 'variable'):
name = [f'{param}_{x}_t{t}' for x in range(self.mapped_sizes[index]) for t in range(self.ntimes)]
names.extend(name)
else:
if 'dynamic' in beh:
dyn_name = self.Parameters[param]['params']
name = [f'{param}_{y}_{x}' for x in range(self.mapped_sizes[index]) for y in dyn_name]
names.extend(name)
return names
def free_to_mapped(self, p):
"""
Convert free into mapped params over time
fixed params get copied over time domain
variable params are indep over time
dynamic params are mapped using dyn model
Parameters
----------
p : 1D array
Returns
-------
2D array (time X params)
"""
# Check input
if (p.size != self.nfree):
raise(Exception('Input free params does not have expected number of entries.'
f' Found {p.size}, expected {self.nfree}'))
# Mapped params is time X nparams (each param is an array of params)
mapped_params = np.empty((self.ntimes, self.mapped_nparams), dtype=object)
counter = 0
for index, name in enumerate(self.mapped_names):
nmapped = self.mapped_sizes[index]
if (self.Parameters[name] == 'fixed'): # repeat param over time
for t in range(self.ntimes):
mapped_params[t, index] = p[counter:counter + nmapped]
counter += nmapped
elif (self.Parameters[name] == 'variable'): # copy one param for each time point
for t in range(self.ntimes):
mapped_params[t, index] = p[counter + t * nmapped:counter + t * nmapped + nmapped]
counter += nmapped
else:
if 'dynamic' in self.Parameters[name]:
# Generate time courses
func_name = self.Parameters[name]['dynamic']
nfree = len(self.Parameters[name]['params'])
mapped = np.zeros((self.ntimes, nmapped))
for i in range(nmapped):
params = p[counter:counter + nfree]
mapped[:, i] = self.fcns[func_name](params, self.time_variable)
counter += nfree
for t in range(self.ntimes):
mapped_params[t, index] = mapped[t, :]
else:
raise(Exception("Unknown Parameter type - should be one of 'fixed', 'variable', {'dynamic'}"))
return mapped_params
def print_free(self, x):
"""
Print free params and their names
"""
print(dict(zip(self.create_free_names(), x)))
def check_bounds(self, x, tol=1e-10):
"""
Check that bounds apply and return corrected x
"""
if self.Bounds is None:
return x
for i, b in enumerate(self.Bounds):
LB = b[0] if b[0] is not None else -np.inf
UB = b[1] if b[1] is not None else np.inf
if (x[i] < LB):
x[i] = LB + tol
if (x[i] > UB):
x[i] = UB - tol
return x
# This function may 'invert' the dynamic mapping
# if the input params are from a single timepoint it assumes constant
def mapped_to_free(self, p):
"""
Convert mapped params to free (e.g. to initialise the free params)
fixed and variable params are simply copied
dynamic params are converted by inverting the dyn model with Scipy optimize
Parameters
----------
p : 2D array (time X params)
Returns
-------
1D array
"""
# Check input
p = self.mapped_from_list(p)
if (p.shape != (self.ntimes, self.mapped_nparams)):
raise(Exception(f'Input mapped params does not have expected number of entries.'
f' Found {p.shape}, expected {(self.ntimes,self.mapped_nparams)}'))
free_params = np.empty(self.nfree)
counter = 0
for index, name in enumerate(self.mapped_names):
psize = self.mapped_sizes[index]
if (self.Parameters[name] == 'fixed'):
free_params[counter:counter + psize] = p[0, index]
counter += psize
elif (self.Parameters[name] == 'variable'):
for t in range(self.ntimes):
free_params[counter:counter + psize] = p[t, index]
counter += psize
else:
if 'dynamic' in self.Parameters[name]:
func_name = self.Parameters[name]['dynamic']
time_var = self.time_variable
func = partial(self.fcns[func_name], t=time_var)
nfree = len(self.Parameters[name]['params'])
pp = np.stack(p[:, index][:], axis=0)
for ppp in range(pp.shape[1]):
def loss(x):
pred = func(x)
return np.mean((pp[:, ppp] - pred)**2)
bounds = self.Bounds[counter:counter + nfree]
vals = minimize(loss,
np.zeros(len(self.Parameters[name]['params'])),
method='TNC', bounds=bounds).x
free_params[counter:counter + nfree] = vals
counter += nfree
else:
raise(Exception("Unknown Parameter type - should be one of 'fixed', 'variable', {'dynamic'}"))
return free_params
#!/usr/bin/env python
# Get example dataset
# Load example datasets
#
# Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
# William Clarke <will.clarke@ndcn.ox.ac.uk>
# William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
import os
from fsl_mrs.utils import mrs_io,misc
from fsl_mrs.utils import mrs_io, misc
from fsl_mrs.core import MRS
from pathlib import Path
def simulated(ID=1):
"""
Datasets from the ISMRM MRS fitting challenge
Courtesy of Malgorzata Marjanska, Dinesh Deelchand,
and Roland Kreis.
ID = 1 up to 28
"""
fileDir = os.path.dirname(__file__)
datafolder = os.path.join(fileDir,'../pkg_data/mrs_fitting_challenge/datasets_JMRUI')
basisfolder = os.path.join(fileDir,'../pkg_data/mrs_fitting_challenge/basisset_JMRUI')
fileDir = Path(__file__).parent
datafolder = fileDir / '../pkg_data/mrs_fitting_challenge/datasets_JMRUI'
basisfolder = fileDir / '../pkg_data/mrs_fitting_challenge/basisset_JMRUI'
# Load data and basis
FID,FIDheader = mrs_io.read_FID(os.path.join(datafolder,f'dataset{ID}_WS.txt'))
FIDW,_ = mrs_io.read_FID(os.path.join(datafolder,f'dataset{ID}_nWS.txt'))
basis,names,Bheader = mrs_io.read_basis(basisfolder)
MRSArgs = {'header':FIDheader,'basis':basis,'names':names,'basis_hdr':Bheader[0],'H2O':FIDW}
mrs = MRS(FID=FID,**MRSArgs)
FID, FIDheader = mrs_io.read_FID(str(datafolder / f'dataset{ID}_WS.txt'))
FIDW, _ = mrs_io.read_FID(str(datafolder / f'dataset{ID}_nWS.txt'))
basis, names, Bheader = mrs_io.read_basis(basisfolder)
MRSArgs = {'header': FIDheader,
'basis': basis,
'names': names,
'basis_hdr': Bheader[0],
'H2O': FIDW}
mrs = MRS(FID=FID, **MRSArgs)
# Check orientation and rescale for extra robustness
mrs.processForFitting()
return mrs
def dMRS(mouse='mouse1'):
def dMRS(mouse='mouse1', path='/Users/saad/Desktop/Spectroscopy/'):
"""
Load Diffusion MRS data from hard-coded location on disk
Args:
......@@ -52,34 +56,47 @@ def dMRS(mouse='mouse1'):
from fsl_mrs.utils.preproc.align import phase_freq_align
from fsl_mrs.utils.preproc.shifting import shiftToRef
import numpy as np
dataPath = '/Users/saad/Desktop/Spectroscopy/DMRS/WT_High_b'
basispath = '/Users/saad/Desktop/Spectroscopy/DMRS/basis_STE_LASER_8_25_50_LacZ.BASIS'
basis,names,header = mrs_io.read_basis(basispath)
path = Path(path)
dataPath = path / 'DMRS/WT_High_b'
basispath = path / 'DMRS/basis_STE_LASER_8_25_50_LacZ.BASIS'
basis, names, header = mrs_io.read_basis(str(basispath))
centralFrequency = 500.30
bandwidth = 5000
currentdir = dataPath / mouse
currentdir = os.path.join(dataPath,mouse)
fidList = []
blist = [20,3000,6000,10000,20000,30000,50000]
blist = [20, 3000, 6000, 10000, 20000, 30000, 50000]
for b in blist:
file = os.path.join(currentdir,'high_b_'+str(b)+'.mat')
file = currentdir / f'high_b_{str(b)}.mat'
tmp = loadmat(file)
fid = np.squeeze(tmp['soustraction'].conj())
fid,_,_ = phaseCorrect(fid,bandwidth,centralFrequency,ppmlim=(2.8,3.2),shift=True)
fidList.append(fid)
fid = np.squeeze(tmp['soustraction'].conj())
fid, _, _ = phaseCorrect(fid,
bandwidth,
centralFrequency,
ppmlim=(2.8, 3.2),
shift=True)
fidList.append(fid)
# Align and shift to Cr reference.
alignedFids,phiOut,epsOut = phase_freq_align(fidList,bandwidth,centralFrequency,ppmlim=(0.2,4.2),niter=2)
alignedFids, _, _ = phase_freq_align(fidList,
bandwidth,
centralFrequency,
ppmlim=(0.2, 4.2),
niter=2)
mrsList = []
for fid,b in zip(alignedFids,blist):
fid,_ = shiftToRef(fid,3.027,bandwidth,centralFrequency,ppmlim=(2.9,3.1))
mrs = MRS(FID=fid,cf=centralFrequency,bw=bandwidth,basis=basis,names=names,basis_hdr=header[0],nucleus='1H')
for fid, b in zip(alignedFids, blist):
fid, _ = shiftToRef(fid, 3.027, bandwidth, centralFrequency, ppmlim=(2.9, 3.1))
mrs = MRS(FID=fid,
cf=centralFrequency,
bw=bandwidth,
basis=basis,
names=names,
basis_hdr=header[0],
nucleus='1H')
mrs.check_FID(repair=True)
mrs.check_Basis(repair=True)
mrs.ignore(['Gly'])
......@@ -92,10 +109,10 @@ def dMRS(mouse='mouse1'):
mrs.basis *= mrsList[0].scaling['basis']
blist = [b / 1e3 for b in blist]
return mrsList,blist
return mrsList, blist
def dMRS_SNR(avg=1):
def dMRS_SNR(avg=1, path='/Users/saad/Desktop/Spectroscopy/'):
"""
Load DMRS data with different numbers of averages
Args:
......@@ -108,33 +125,35 @@ def dMRS_SNR(avg=1):
"""
from fsl_mrs.utils import mrs_io
basispath = '/Users/saad/Desktop/Spectroscopy/DMRS/basis_STE_LASER_8_25_50_LacZ.BASIS'
basis,names,Bheader = mrs_io.read_basis(basispath)
# Bheader['ResonantNucleus'] = '1H'
centralFrequency = 500.30
bandwidth = 5000
path = Path(path)
basispath = path / 'DMRS/basis_STE_LASER_8_25_50_LacZ.BASIS'
basis, names, Bheader = mrs_io.read_basis(str(basispath))
Bheader[0]['ResonantNucleus'] = '1H'
FIDpath = f'/Users/saad/Desktop/Spectroscopy/DMRS/WT_multi/{avg:03}_avg'
bvals = [20,3020,6000,10000,20000,30000,50000]
FIDpath = path / f'DMRS/WT_multi/{avg:03}_avg'
bvals = [20, 3020, 6000, 10000, 20000, 30000, 50000]
MRSlist = []
for b in bvals:
FID,FIDheader = mrs_io.read_FID(FIDpath+f'/b_{b:05}.nii.gz')
MRSArgs = {'header': FIDheader, 'basis': basis, 'names': names, 'basis_hdr': Bheader[0]}
FID, FIDheader = mrs_io.read_FID(str(FIDpath / f'b_{b:05}.nii.gz'))
MRSArgs = {'header': FIDheader,
'basis': basis,
'names': names,
'basis_hdr': Bheader[0]}
mrs = MRS(FID=FID, **MRSArgs)
MRSlist.append(mrs)
MRSlist[0].rescaleForFitting()
for i,mrs in enumerate(MRSlist):
if i>0:
for i, mrs in enumerate(MRSlist):
if i > 0:
mrs.FID *= MRSlist[0].scaling['FID']
mrs.basis *= MRSlist[0].scaling['basis']
bvals = [b / 1e3 for b in bvals]
return MRSlist,bvals
return MRSlist, bvals
def FMRS(smooth=False):
def FMRS(smooth=False, path='/Users/saad/Desktop/Spectroscopy/'):
"""
Load Functional MRS data from hard-coded location on disk
Args:
......@@ -144,30 +163,35 @@ def FMRS(smooth=False):
mrs Object list
list (time variable)
"""
import glob
from numpy import repeat
folder = '/Users/saad/Desktop/Spectroscopy/FMRS/Jacob_data'
filelist = glob.glob(os.path.join(folder, 'RAWFORMAT', 'run_???.RAW'))
path = Path(path)
folder = path / 'FMRS/Jacob_data'
FIDlist = []
for file in filelist:
for file in (folder / 'RAWFORMAT').glob('run_???.RAW'):
FID, FIDheader = mrs_io.read_FID(file)
FIDlist.append(FID)
basisfile = os.path.join(folder, '7T_slaser36ms_2013_oxford_tdcslb1_ivan.BASIS')
basis, names, basisheader = mrs_io.read_basis(basisfile)
basisfile = folder / '7T_slaser36ms_2013_oxford_tdcslb1_ivan.BASIS'
basis, names, basisheader = mrs_io.read_basis(str(basisfile))
# # Resample basis
from fsl_mrs.utils import misc
basis = misc.ts_to_ts(basis, basisheader[0]['dwelltime'], FIDheader['dwelltime'], FID.shape[0])
basis = misc.ts_to_ts(basis,
basisheader[0]['dwelltime'],
FIDheader['dwelltime'],
FID.shape[0])
if smooth:
sFIDlist = misc.smooth_FIDs(FIDlist, window=5)
else:
sFIDlist = FIDlist
MRSargs = {'names': names, 'basis': basis, 'basis_hdr': basisheader[0], 'bw': FIDheader['bandwidth'],
MRSargs = {'names': names,
'basis': basis,
'basis_hdr': basisheader[0],
'bw': FIDheader['bandwidth'],
'cf': FIDheader['centralFrequency']}
mrsList = []
......@@ -178,9 +202,10 @@ def FMRS(smooth=False):
# Stimulus variable
stim = repeat([0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0], 16, axis=0).flatten()
return mrsList,stim
return mrsList, stim
def MPRESS(noise=1):
def MPRESS(noise=1, path='/Users/saad/Desktop/Spectroscopy/'):
"""
Load MegaPress edited MRS data from hard-coded location on disk
Args:
......@@ -192,21 +217,31 @@ def MPRESS(noise=1):
"""
from fsl_mrs.utils import mrs_io
from fsl_mrs.utils.synthetic.synthetic_from_basis import syntheticFromBasisFile
mpress_on = '/Users/saad/Desktop/Spectroscopy/mpress_basis/ON'
mpress_off = '/Users/saad/Desktop/Spectroscopy/mpress_basis/OFF'
basis,names,basis_hdr = mrs_io.read_basis(mpress_on)
FIDs,header,conc = syntheticFromBasisFile(mpress_on,noisecovariance=[[noise]])
mrs1 = MRS(FID=FIDs,header=header,basis=basis,basis_hdr=basis_hdr[0],names=names)
path = Path(path)