Commit a652450f authored by William Clarke's avatar William Clarke
Browse files

Merge branch 'synthetic_models' into 'master'

Synthetic models

See merge request wclarke/fsl_mrs!1
parents 768dca61 74fe7958
......@@ -169,8 +169,8 @@ class MRS(object):
(cf_MHz > sevent_range[0] and cf_MHz < sevent_range[1]) or \
(cf_MHz > ninefourt_range[0] and cf_MHz < ninefourt_range[1]) or \
(cf_MHz > elevensevent_range[0] and cf_MHz < elevensevent_range[1]):
print(f'Identified as {key} nucleus data.'
f' Esitmated field: {cf_MHz/GYRO_MAG_RATIO[key]} T.')
#print(f'Identified as {key} nucleus data.'
# f' Esitmated field: {cf_MHz/GYRO_MAG_RATIO[key]} T.')
return key
raise ValueError(f'Unidentified nucleus,'
......
from fsl_mrs.utils import baseline
from fsl_mrs.core import MRS
import numpy as np
def test_regressor_creation():
# Create dummy mrs
mrs = MRS(FID=np.zeros((1000,)), cf=100, bw=2000, nucleus='1H')
baseline_order = 3
ppmlim = (-1, 1)
B = baseline.prepare_baseline_regressor(mrs, baseline_order, ppmlim)
# Need some better tests...
assert B.shape == (1000, 8)
assert np.sum(B[:, 0]) == 100
assert np.sum(B[:, 1]) == 100j
from fsl_mrs.utils import synthetic as syn
from fsl_mrs.utils.misc import FIDToSpec
from fsl_mrs.utils import mrs_io
from fsl_mrs.core import MRS
import numpy as np
from pathlib import Path
testsPath = Path(__file__).parent
basis_path = testsPath / 'testdata/fsl_mrs/steam_basis'
dynamic_model_path = testsPath / 'testdata/dynamic/fmrs_sin_model.py'
def test_noisecov():
# Create positive semi-definite noise covariance
inputnoisecov = np.random.random((2,2))
inputnoisecov= np.dot(inputnoisecov,inputnoisecov.T)
inputnoisecov = np.random.random((2, 2))
inputnoisecov = np.dot(inputnoisecov, inputnoisecov.T)
testFID,hdr = syn.syntheticFID(coilamps=[1.0,1.0],
coilphase=[0.0,0.0],
noisecovariance=inputnoisecov,
amplitude=[0.0,0.0],
points= 32768)
testFID, hdr = syn.syntheticFID(coilamps=[1.0, 1.0],
coilphase=[0.0, 0.0],
noisecovariance=inputnoisecov,
amplitude=[0.0, 0.0],
points=32768)
outcov = np.cov(np.asarray(testFID))
# Noise cov is for both real and imag, so multiply by 2
assert np.isclose(outcov,2*inputnoisecov,atol=1E-1).all()
assert np.allclose(outcov, 2 * inputnoisecov, atol=1E-1)
def test_syntheticFID():
testFID,hdr = syn.syntheticFID(noisecovariance=[[0.0]],points=16384)
testFID, hdr = syn.syntheticFID(noisecovariance=[[0.0]], points=16384)
# Check FID is sum of lorentzian lineshapes
# anlytical solution
T2 = 1/(hdr['inputopts']['damping'][0])
T2 = 1 / (hdr['inputopts']['damping'][0])
M0 = hdr['inputopts']['amplitude'][0]
f0 = hdr['inputopts']['centralfrequency']*hdr['inputopts']['chemicalshift'][0]
f1 = hdr['inputopts']['centralfrequency']*hdr['inputopts']['chemicalshift'][1]
f0 = hdr['inputopts']['centralfrequency'] * hdr['inputopts']['chemicalshift'][0]
f1 = hdr['inputopts']['centralfrequency'] * hdr['inputopts']['chemicalshift'][1]
f = hdr['faxis']
spec = (M0*T2)/(1+4*np.pi**2*(f0-f)**2*T2**2) +1j*(2*np.pi*M0*(f0-f)*T2**2)/(1+4*np.pi**2*(f0-f)**2*T2**2)
spec += (M0*T2)/(1+4*np.pi**2*(f1-f)**2*T2**2) +1j*(2*np.pi*M0*(f1-f)*T2**2)/(1+4*np.pi**2*(f1-f)**2*T2**2)
spec = (M0 * T2) \
/ (1 + 4 * np.pi**2 * (f0 - f)**2 * T2**2) \
+ 1j * (2 * np.pi * M0 * (f0 - f) * T2**2) \
/ (1 + 4 * np.pi**2 * (f0 - f)**2 * T2**2)
spec += (M0 * T2) \
/ (1 + 4 * np.pi**2 * (f1 - f)**2 * T2**2) \
+ 1j * (2 * np.pi * M0 * (f1 - f) * T2**2) \
/ (1 + 4 * np.pi**2 * (f1 - f)**2 * T2**2)
# Can't quite get the scaling right here.
testSpec = FIDToSpec(testFID[0])
spec /= np.max(np.abs(spec))
testSpec /= np.max(np.abs(testSpec))
assert np.isclose(spec,FIDToSpec(testFID[0]),atol = 1E-2,rtol = 1E0).all()
\ No newline at end of file
assert np.allclose(spec, FIDToSpec(testFID[0]), atol=1E-2, rtol=1E0)
def test_syntheticFromBasis():
fid, header, _ = syn.syntheticFromBasisFile(str(basis_path),
ignore=['Scyllo'],
baseline=[0.0, 0.0],
concentrations={'Mac': 2.0},
coilamps=[1.0, 1.0],
coilphase=[0.0, np.pi],
noisecovariance=[[0.1, 0.0], [0.0, 0.1]])
assert fid.shape == (2048, 2)
def test_syntheticFromBasis_baseline():
fid, header, _ = syn.syntheticFromBasisFile(str(basis_path),
baseline=[0.0, 0.0],
concentrations={'Mac': 2.0},
noisecovariance=[[0.0]])
mrs = MRS(FID=fid, header=header)
mrs.conj_FID()
fid, header, _ = syn.syntheticFromBasisFile(str(basis_path),
baseline=[1.0, 1.0],
concentrations={'Mac': 2.0},
noisecovariance=[[0.0]])
mrs2 = MRS(FID=fid, header=header)
mrs2.conj_FID()
assert np.allclose(mrs2.get_spec(), mrs.get_spec() + np.complex(1.0, -1.0))
def test_synthetic_spectra_from_model():
_, names, _ = mrs_io.read_basis(str(basis_path))
time_var = np.arange(0, 10)
period = 10.0
time_var = time_var / period
camp = [0] * len(names)
camp[names.index('NAA')] = 2
defined_vals = {'c_0': 'conc',
'c_amp': camp,
'gamma': (20, 0),
'sigma': (20, 0),
'b_intercept': [1, 1, 1, 1, 1, 1],
'b_slope': [0.1, 0.1, 0.1, 0.1, 0.1, 0.1]}
p_noise = {'eps': (0, [0.01])}
p_rel_noise = {'conc': (0, [0.05])}
mrs_list, _, _ = syn.synthetic_spectra_from_model(
str(dynamic_model_path),
time_var,
str(basis_path),
ignore=None,
metab_groups=['Mac'],
ind_scaling=['Mac'],
baseline_order=2,
concentrations={'Mac': 15},
defined_vals=defined_vals,
bandwidth=6000,
points=2048,
baseline_ppm=(0, 5),
param_noise=p_noise,
param_rel_noise=p_rel_noise,
coilamps=[1.0, 1.0],
coilphase=[0.0, np.pi],
noisecovariance=[[0.1, 0.0], [0.0, 0.1]])
assert len(mrs_list) == 10
assert len(mrs_list[0]) == 2
assert mrs_list[0][0].numBasis == len(names)
assert mrs_list[0][0].numPoints == 2048
This source diff could not be displayed because it is stored in LFS. You can view the blob instead.
#!/usr/bin/env python
# baseline.py - Functions associated with the baseline description
#
# Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
# William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
import numpy as np
from fsl_mrs.utils.misc import regress_out
def prepare_baseline_regressor(mrs, baseline_order, ppmlim):
"""
Complex baseline is polynomial
Parameters:
-----------
mrs : MRS object
baseline_order : degree of polynomial (>=1)
ppmlim : interval over which baseline is non-zero
Returns:
--------
2D numpy array
"""
first, last = mrs.ppmlim_to_range(ppmlim)
B = []
x = np.zeros(mrs.numPoints, np.complex)
x[first:last] = np.linspace(-1, 1, last - first)
for i in range(baseline_order + 1):
regressor = x**i
if i > 0:
regressor = regress_out(regressor, B, keep_mean=False)
B.append(regressor.flatten())
B.append(1j * regressor.flatten())
B = np.asarray(B).T
tmp = B.copy()
B = 0 * B
B[first:last, :] = tmp[first:last, :].copy()
return B
from fsl_mrs.utils.dynamic.variable_mapping import VariableMapping
# variable_mapping.py - Class responsible for variable mapping
#
# Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
# William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
import numpy as np
from scipy.optimize import minimize
from functools import partial
class VariableMapping(object):
def __init__(self,
param_names,
param_sizes,
time_variable,
config_file):
"""
Variable Mapping Class Constructor
Mapping betwee free and mapped:
Mapped = TxN matrix
Mapped[i,j] = float or 1D-array of floats with size param_sizes[j]
Parameters
----------
param_names : list
param_sizes : list
time_variale : array-like
config_file : string
"""
self.time_variable = np.asarray(time_variable)
self.ntimes = self.time_variable.shape[0]
self.mapped_names = param_names
self.mapped_nparams = len(self.mapped_names)
self.mapped_sizes = param_sizes
from runpy import run_path
settings = run_path(config_file)
self.Parameters = settings['Parameters']
for name in self.mapped_names:
if name not in self.Parameters:
self.Parameters[name] = 'fixed'
self.fcns = {}
for key in settings:
if callable(settings[key]):
self.fcns[key] = settings[key]
if 'Bounds' in settings:
self.defined_bounds = settings['Bounds']
self.Bounds = self.create_constraints(settings['Bounds'])
else:
self.defined_bounds = None
self.Bounds = self.create_constraints(None)
self.nfree = self.calc_nfree()
def __str__(self):
OUT = '-----------------------\n'
OUT += 'Variable Mapping Object\n'
OUT += '-----------------------\n'
OUT += f'Number of Mapped param groups = {len(self.mapped_names)}\n'
OUT += f'Number of Mapped params = {sum(self.mapped_sizes)}\n'
OUT += f'Number of Free params = {self.nfree}\n'
OUT += f'Number of params if all indep = {sum(self.mapped_sizes)*self.ntimes}\n'
OUT += 'Dynamic functions\n'
for param_name in self.mapped_names:
beh = self.Parameters[param_name]
OUT += f'{param_name} \t {beh}\n'
return OUT
def __repr__(self) -> str:
return str(self)
def calc_nfree(self):
"""
Calculate number of free parameters based on mapped behaviour
Returns
-------
int
"""
N = 0
for index, param in enumerate(self.mapped_names):
beh = self.Parameters[param]
if (beh == 'fixed'):
N += self.mapped_sizes[index]
elif (beh == 'variable'):
N += self.ntimes * self.mapped_sizes[index]
else:
if 'dynamic' in beh:
N += len(beh['params']) * self.mapped_sizes[index]
return N
def create_constraints(self, bounds):
"""
Create list of constraints to be used in optimization
Parameters:
-----------
bounds : dict {param:bounds}
Returns
-------
list
"""
if bounds is None:
return [(None, None)] * self.calc_nfree()
if not isinstance(bounds, dict):
raise(Exception('Input should either be a dict or None'))
b = [] # list of bounds
for index, name in enumerate(self.mapped_names):
psize = self.mapped_sizes[index]
if (self.Parameters[name] == 'fixed'):
# check if there are bound on this param
if name in bounds:
for s in range(psize):
b.append(bounds[name])
else:
for s in range(psize):
b.append((None, None))
elif (self.Parameters[name] == 'variable'):
for t in range(self.ntimes):
for s in range(psize):
if name in bounds:
b.append(bounds[name])
else:
b.append((None, None))
else:
if 'dynamic' in self.Parameters[name]:
pnames = self.Parameters[name]['params']
for s in range(psize):
for p in pnames:
if p in bounds:
b.append(bounds[p])
else:
b.append((None, None))
return b
def mapped_from_list(self, p):
"""
Converts list of params into Mapped by repeating over time
Parameters
----------
p : list
Returns
-------
2D array
"""
if isinstance(p, list):
p = np.asarray(p)
if (p.ndim == 1):
p = np.repeat(p[None, :], self.ntimes, 0)
return p
def create_free_names(self):
"""
create list of names for free params
Returns
-------
list of strings
"""
names = []
for index, param in enumerate(self.mapped_names):
beh = self.Parameters[param]
if (beh == 'fixed'):
name = [f'{param}_{x}' for x in range(self.mapped_sizes[index])]
names.extend(name)
elif (beh == 'variable'):
name = [f'{param}_{x}_t{t}' for x in range(self.mapped_sizes[index]) for t in range(self.ntimes)]
names.extend(name)
else:
if 'dynamic' in beh:
dyn_name = self.Parameters[param]['params']
name = [f'{param}_{y}_{x}' for x in range(self.mapped_sizes[index]) for y in dyn_name]
names.extend(name)
return names
def free_to_mapped(self, p):
"""
Convert free into mapped params over time
fixed params get copied over time domain
variable params are indep over time
dynamic params are mapped using dyn model
Parameters
----------
p : 1D array
Returns
-------
2D array (time X params)
"""
# Check input
if (p.size != self.nfree):
raise(Exception('Input free params does not have expected number of entries.'
f' Found {p.size}, expected {self.nfree}'))
# Mapped params is time X nparams (each param is an array of params)
mapped_params = np.empty((self.ntimes, self.mapped_nparams), dtype=object)
counter = 0
for index, name in enumerate(self.mapped_names):
nmapped = self.mapped_sizes[index]
if (self.Parameters[name] == 'fixed'): # repeat param over time
for t in range(self.ntimes):
mapped_params[t, index] = p[counter:counter + nmapped]
counter += nmapped
elif (self.Parameters[name] == 'variable'): # copy one param for each time point
for t in range(self.ntimes):
mapped_params[t, index] = p[counter + t * nmapped:counter + t * nmapped + nmapped]
counter += nmapped
else:
if 'dynamic' in self.Parameters[name]:
# Generate time courses
func_name = self.Parameters[name]['dynamic']
nfree = len(self.Parameters[name]['params'])
mapped = np.zeros((self.ntimes, nmapped))
for i in range(nmapped):
params = p[counter:counter + nfree]
mapped[:, i] = self.fcns[func_name](params, self.time_variable)
counter += nfree
for t in range(self.ntimes):
mapped_params[t, index] = mapped[t, :]
else:
raise(Exception("Unknown Parameter type - should be one of 'fixed', 'variable', {'dynamic'}"))
return mapped_params
def print_free(self, x):
"""
Print free params and their names
"""
print(dict(zip(self.create_free_names(), x)))
def check_bounds(self, x, tol=1e-10):
"""
Check that bounds apply and return corrected x
"""
if self.Bounds is None:
return x
for i, b in enumerate(self.Bounds):
LB = b[0] if b[0] is not None else -np.inf
UB = b[1] if b[1] is not None else np.inf
if (x[i] < LB):
x[i] = LB + tol
if (x[i] > UB):
x[i] = UB - tol
return x
# This function may 'invert' the dynamic mapping
# if the input params are from a single timepoint it assumes constant
def mapped_to_free(self, p):
"""
Convert mapped params to free (e.g. to initialise the free params)
fixed and variable params are simply copied
dynamic params are converted by inverting the dyn model with Scipy optimize
Parameters
----------
p : 2D array (time X params)
Returns
-------
1D array
"""
# Check input
p = self.mapped_from_list(p)
if (p.shape != (self.ntimes, self.mapped_nparams)):
raise(Exception(f'Input mapped params does not have expected number of entries.'
f' Found {p.shape}, expected {(self.ntimes,self.mapped_nparams)}'))
free_params = np.empty(self.nfree)
counter = 0
for index, name in enumerate(self.mapped_names):
psize = self.mapped_sizes[index]
if (self.Parameters[name] == 'fixed'):
free_params[counter:counter + psize] = p[0, index]
counter += psize
elif (self.Parameters[name] == 'variable'):
for t in range(self.ntimes):
free_params[counter:counter + psize] = p[t, index]
counter += psize
else:
if 'dynamic' in self.Parameters[name]:
func_name = self.Parameters[name]['dynamic']
time_var = self.time_variable
func = partial(self.fcns[func_name], t=time_var)
nfree = len(self.Parameters[name]['params'])
pp = np.stack(p[:, index][:], axis=0)
for ppp in range(pp.shape[1]):
def loss(x):
pred = func(x)
return np.mean((pp[:, ppp] - pred)**2)
bounds = self.Bounds[counter:counter + nfree]
vals = minimize(loss,
np.zeros(len(self.Parameters[name]['params'])),
method='TNC', bounds=bounds).x
free_params[counter:counter + nfree] = vals
counter += nfree
else:
raise(Exception("Unknown Parameter type - should be one of 'fixed', 'variable', {'dynamic'}"))
return free_params
#!/usr/bin/env python
# Get example dataset
# Load example datasets
#
# Author: Saad Jbabdi <saad@fmrib.ox.ac.uk>
# William Clarke <will.clarke@ndcn.ox.ac.uk>
# William Clarke <william.clarke@ndcn.ox.ac.uk>
#
# Copyright (C) 2019 University of Oxford
# Copyright (C) 2019 University of Oxford
# SHBASECOPYRIGHT
import os
from fsl_mrs.utils import mrs_io,misc
from fsl_mrs.utils import mrs_io, misc
from fsl_mrs.core import MRS
from pathlib import Path