diff --git a/fsl/data/dcmstack/COPYING b/fsl/data/dcmstack/COPYING deleted file mode 100644 index 7374d5d0503922ef8878a0f2e57f9750c7314f15..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/COPYING +++ /dev/null @@ -1,35 +0,0 @@ -********************** -Copyright and Licenses -********************** - -dcmstack -======= - -The dcmstack package, including all examples, code snippets and attached -documentation is covered by the MIT license. - -:: - - The MIT License - - Copyright (c) 2011-2012 Brendan Moloney <moloney@ohsu.edu> - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. - - diff --git a/fsl/data/dcmstack/__init__.py b/fsl/data/dcmstack/__init__.py deleted file mode 100644 index 2663c2259617e0b3d8416001ca8d6f2d85dcae48..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Package for stacking DICOM images into multi dimensional volumes, extracting -the DICOM meta data, converting the result to Nifti files with the meta -data stored in a header extension, and work with these extended Nifti files. -""" -from .info import __version__ -from .dcmstack import * diff --git a/fsl/data/dcmstack/dcmmeta.py b/fsl/data/dcmstack/dcmmeta.py deleted file mode 100644 index 9049d7d7c923dcb73d242dcf9cc3f43c1d2e515b..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/dcmmeta.py +++ /dev/null @@ -1,1804 +0,0 @@ -""" -DcmMeta header extension and NiftiWrapper for working with extended Niftis. -""" -from __future__ import print_function - -import sys -import json, warnings -from copy import deepcopy -import numpy as np -import nibabel as nb -from nibabel.nifti1 import Nifti1Extension -from nibabel.spatialimages import HeaderDataError - -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict - -with warnings.catch_warnings(): - warnings.simplefilter('ignore') - from nibabel.nicom.dicomwrappers import wrapper_from_data - -dcm_meta_ecode = 0 - -_meta_version = 0.6 - -_req_base_keys_map= {0.5 : set(('dcmmeta_affine', - 'dcmmeta_slice_dim', - 'dcmmeta_shape', - 'dcmmeta_version', - 'global', - ) - ), - 0.6 : set(('dcmmeta_affine', - 'dcmmeta_reorient_transform', - 'dcmmeta_slice_dim', - 'dcmmeta_shape', - 'dcmmeta_version', - 'global', - ) - ), - } -'''Minimum required keys in the base dictionaty to be considered valid''' - -def is_constant(sequence, period=None): - '''Returns true if all elements in (each period of) the sequence are equal. - - Parameters - ---------- - sequence : sequence - The sequence of elements to check. - - period : int - If not None then each subsequence of that length is checked. - ''' - if period is None: - return all(val == sequence[0] for val in sequence) - else: - if period <= 1: - raise ValueError('The period must be greater than one') - seq_len = len(sequence) - if seq_len % period != 0: - raise ValueError('The sequence length is not evenly divisible by ' - 'the period length.') - - for period_idx in range(seq_len / period): - start_idx = period_idx * period - end_idx = start_idx + period - if not all(val == sequence[start_idx] - for val in sequence[start_idx:end_idx]): - return False - - return True - -def is_repeating(sequence, period): - '''Returns true if the elements in the sequence repeat with the given - period. - - Parameters - ---------- - sequence : sequence - The sequence of elements to check. - - period : int - The period over which the elements should repeat. - ''' - seq_len = len(sequence) - if period <= 1 or period >= seq_len: - raise ValueError('The period must be greater than one and less than ' - 'the length of the sequence') - if seq_len % period != 0: - raise ValueError('The sequence length is not evenly divisible by the ' - 'period length.') - - for period_idx in range(1, seq_len / period): - start_idx = period_idx * period - end_idx = start_idx + period - if sequence[start_idx:end_idx] != sequence[:period]: - return False - - return True - -class InvalidExtensionError(Exception): - def __init__(self, msg): - '''Exception denoting than a DcmMetaExtension is invalid.''' - self.msg = msg - - def __str__(self): - return 'The extension is not valid: %s' % self.msg - - -class DcmMetaExtension(Nifti1Extension): - '''Nifti extension for storing a summary of the meta data from the source - DICOM files. - ''' - - @property - def reorient_transform(self): - '''The transformation due to reorientation of the data array. Can be - used to update directional DICOM meta data (after converting to RAS if - needed) into the same space as the affine.''' - if self.version < 0.6: - return None - if self._content['dcmmeta_reorient_transform'] is None: - return None - return np.array(self._content['dcmmeta_reorient_transform']) - - @reorient_transform.setter - def reorient_transform(self, value): - if not value is None and value.shape != (4, 4): - raise ValueError("The reorient_transform must be none or (4,4) " - "array") - if value is None: - self._content['dcmmeta_reorient_transform'] = None - else: - self._content['dcmmeta_reorient_transform'] = value.tolist() - - @property - def affine(self): - '''The affine associated with the meta data. If this differs from the - image affine, the per-slice meta data will not be used. ''' - return np.array(self._content['dcmmeta_affine']) - - @affine.setter - def affine(self, value): - if value.shape != (4, 4): - raise ValueError("Invalid shape for affine") - self._content['dcmmeta_affine'] = value.tolist() - - @property - def slice_dim(self): - '''The index of the slice dimension associated with the per-slice - meta data.''' - return self._content['dcmmeta_slice_dim'] - - @slice_dim.setter - def slice_dim(self, value): - if not value is None and not (0 <= value < 3): - raise ValueError("The slice dimension must be between zero and " - "two") - self._content['dcmmeta_slice_dim'] = value - - @property - def shape(self): - '''The shape of the data associated with the meta data. Defines the - number of values for the meta data classifications.''' - return tuple(self._content['dcmmeta_shape']) - - @shape.setter - def shape(self, value): - if not (3 <= len(value) < 6): - raise ValueError("The shape must have a length between three and " - "six") - self._content['dcmmeta_shape'][:] = value - - @property - def version(self): - '''The version of the meta data extension.''' - return self._content['dcmmeta_version'] - - @version.setter - def version(self, value): - '''Set the version of the meta data extension.''' - self._content['dcmmeta_version'] = value - - @property - def slice_normal(self): - '''The slice normal associated with the per-slice meta data.''' - slice_dim = self.slice_dim - if slice_dim is None: - return None - return np.array(self.affine[slice_dim][:3]) - - @property - def n_slices(self): - '''The number of slices associated with the per-slice meta data.''' - slice_dim = self.slice_dim - if slice_dim is None: - return None - return self.shape[slice_dim] - - classifications = (('global', 'const'), - ('global', 'slices'), - ('time', 'samples'), - ('time', 'slices'), - ('vector', 'samples'), - ('vector', 'slices'), - ) - '''The classifications used to separate meta data based on if and how the - values repeat. Each class is a tuple with a base class and a sub class.''' - - def get_valid_classes(self): - '''Return the meta data classifications that are valid for this - extension. - - Returns - ------- - valid_classes : tuple - The classifications that are valid for this extension (based on its - shape). - - ''' - shape = self.shape - n_dims = len(shape) - if n_dims == 3: - return self.classifications[:2] - elif n_dims == 4: - return self.classifications[:4] - elif n_dims == 5: - if shape[3] != 1: - return self.classifications - else: - return self.classifications[:2] + self.classifications[-2:] - else: - raise ValueError("There must be 3 to 5 dimensions.") - - def get_multiplicity(self, classification): - '''Get the number of meta data values for all meta data of the provided - classification. - - Parameters - ---------- - classification : tuple - The meta data classification. - - Returns - ------- - multiplicity : int - The number of values for any meta data of the provided - `classification`. - ''' - if not classification in self.get_valid_classes(): - raise ValueError("Invalid classification: %s" % classification) - - base, sub = classification - shape = self.shape - n_vals = 1 - if sub == 'slices': - n_vals = self.n_slices - if n_vals is None: - return 0 - if base == 'vector': - n_vals *= shape[3] - elif base == 'global': - for dim_size in shape[3:]: - n_vals *= dim_size - elif sub == 'samples': - if base == 'time': - n_vals = shape[3] - if len(shape) == 5: - n_vals *= shape[4] - elif base == 'vector': - n_vals = shape[4] - - return n_vals - - def check_valid(self): - '''Check if the extension is valid. - - Raises - ------ - InvalidExtensionError - The extension is missing required meta data or classifications, or - some element(s) have the wrong number of values for their - classification. - ''' - #Check for the required base keys in the json data - if not _req_base_keys_map[self.version] <= set(self._content): - raise InvalidExtensionError('Missing one or more required keys') - - #Check the orientation/shape/version - if self.affine.shape != (4, 4): - raise InvalidExtensionError('Affine has incorrect shape') - slice_dim = self.slice_dim - if slice_dim is not None: - if not (0 <= slice_dim < 3): - raise InvalidExtensionError('Slice dimension is not valid') - if not (3 <= len(self.shape) < 6): - raise InvalidExtensionError('Shape is not valid') - - #Check all required meta dictionaries, make sure values have correct - #multiplicity - valid_classes = self.get_valid_classes() - for classes in valid_classes: - if not classes[0] in self._content: - raise InvalidExtensionError('Missing required base ' - 'classification %s' % classes[0]) - if not classes[1] in self._content[classes[0]]: - raise InvalidExtensionError(('Missing required sub ' - 'classification %s in base ' - 'classification %s') % classes) - cls_meta = self.get_class_dict(classes) - cls_mult = self.get_multiplicity(classes) - if cls_mult == 0 and len(cls_meta) != 0: - raise InvalidExtensionError('Slice dim is None but per-slice ' - 'meta data is present') - elif cls_mult > 1: - for key, vals in cls_meta.iteritems(): - n_vals = len(vals) - if n_vals != cls_mult: - msg = (('Incorrect number of values for key %s with ' - 'classification %s, expected %d found %d') % - (key, classes, cls_mult, n_vals) - ) - raise InvalidExtensionError(msg) - - #Check that all keys are uniquely classified - for classes in valid_classes: - for other_classes in valid_classes: - if classes == other_classes: - continue - intersect = (set(self.get_class_dict(classes)) & - set(self.get_class_dict(other_classes)) - ) - if len(intersect) != 0: - raise InvalidExtensionError("One or more keys have " - "multiple classifications") - - def get_keys(self): - '''Get a list of all the meta data keys that are available.''' - keys = [] - for base_class, sub_class in self.get_valid_classes(): - keys += self._content[base_class][sub_class].keys() - return keys - - def get_classification(self, key): - '''Get the classification for the given `key`. - - Parameters - ---------- - key : str - The meta data key. - - Returns - ------- - classification : tuple or None - The classification tuple for the provided key or None if the key is - not found. - - ''' - for base_class, sub_class in self.get_valid_classes(): - if key in self._content[base_class][sub_class]: - return (base_class, sub_class) - - return None - - def get_class_dict(self, classification): - '''Get the dictionary for the given classification. - - Parameters - ---------- - classification : tuple - The meta data classification. - - Returns - ------- - meta_dict : dict - The dictionary for the provided classification. - ''' - base, sub = classification - return self._content[base][sub] - - def get_values(self, key): - '''Get all values for the provided key. - - Parameters - ---------- - key : str - The meta data key. - - Returns - ------- - values - The value or values for the given key. The number of values - returned depends on the classification (see 'get_multiplicity'). - ''' - classification = self.get_classification(key) - if classification is None: - return None - return self.get_class_dict(classification)[key] - - def get_values_and_class(self, key): - '''Get the values and the classification for the provided key. - - Parameters - ---------- - key : str - The meta data key. - - Returns - ------- - vals_and_class : tuple - None for both the value and classification if the key is not found. - - ''' - classification = self.get_classification(key) - if classification is None: - return (None, None) - return (self.get_class_dict(classification)[key], classification) - - def filter_meta(self, filter_func): - '''Filter the meta data. - - Parameters - ---------- - filter_func : callable - Must take a key and values as parameters and return True if they - should be filtered out. - - ''' - for classes in self.get_valid_classes(): - filtered = [] - curr_dict = self.get_class_dict(classes) - for key, values in curr_dict.iteritems(): - if filter_func(key, values): - filtered.append(key) - for key in filtered: - del curr_dict[key] - - def clear_slice_meta(self): - '''Clear all meta data that is per slice.''' - for base_class, sub_class in self.get_valid_classes(): - if sub_class == 'slices': - self.get_class_dict((base_class, sub_class)).clear() - - def get_subset(self, dim, idx): - '''Get a DcmMetaExtension containing a subset of the meta data. - - Parameters - ---------- - dim : int - The dimension we are taking the subset along. - - idx : int - The position on the dimension `dim` for the subset. - - Returns - ------- - result : DcmMetaExtension - A new DcmMetaExtension corresponding to the subset. - - ''' - if not 0 <= dim < 5: - raise ValueError("The argument 'dim' must be in the range [0, 5).") - - shape = self.shape - valid_classes = self.get_valid_classes() - - #Make an empty extension for the result - result_shape = list(shape) - result_shape[dim] = 1 - while result_shape[-1] == 1 and len(result_shape) > 3: - result_shape = result_shape[:-1] - result = self.make_empty(result_shape, - self.affine, - self.reorient_transform, - self.slice_dim - ) - - for src_class in valid_classes: - #Constants remain constant - if src_class == ('global', 'const'): - for key, val in self.get_class_dict(src_class).iteritems(): - result.get_class_dict(src_class)[key] = deepcopy(val) - continue - - if dim == self.slice_dim: - if src_class[1] != 'slices': - for key, vals in self.get_class_dict(src_class).iteritems(): - result.get_class_dict(src_class)[key] = deepcopy(vals) - else: - result._copy_slice(self, src_class, idx) - elif dim < 3: - for key, vals in self.get_class_dict(src_class).iteritems(): - result.get_class_dict(src_class)[key] = deepcopy(vals) - elif dim == 3: - result._copy_sample(self, src_class, 'time', idx) - else: - result._copy_sample(self, src_class, 'vector', idx) - - return result - - def to_json(self): - '''Return the extension encoded as a JSON string.''' - self.check_valid() - return self._mangle(self._content) - - @classmethod - def from_json(klass, json_str): - '''Create an extension from the JSON string representation.''' - result = klass(dcm_meta_ecode, json_str) - result.check_valid() - return result - - @classmethod - def make_empty(klass, shape, affine, reorient_transform=None, - slice_dim=None): - '''Make an empty DcmMetaExtension. - - Parameters - ---------- - shape : tuple - The shape of the data associated with this extension. - - affine : array - The RAS affine for the data associated with this extension. - - reorient_transform : array - The transformation matrix representing any reorientation of the - data array. - - slice_dim : int - The index of the slice dimension for the data associated with this - extension - - Returns - ------- - result : DcmMetaExtension - An empty DcmMetaExtension with the required values set to the - given arguments. - - ''' - result = klass(dcm_meta_ecode, '{}') - result._content['global'] = OrderedDict() - result._content['global']['const'] = OrderedDict() - result._content['global']['slices'] = OrderedDict() - - if len(shape) > 3 and shape[3] != 1: - result._content['time'] = OrderedDict() - result._content['time']['samples'] = OrderedDict() - result._content['time']['slices'] = OrderedDict() - - if len(shape) > 4: - result._content['vector'] = OrderedDict() - result._content['vector']['samples'] = OrderedDict() - result._content['vector']['slices'] = OrderedDict() - - result._content['dcmmeta_shape'] = [] - result.shape = shape - result.affine = affine - result.reorient_transform = reorient_transform - result.slice_dim = slice_dim - result.version = _meta_version - - return result - - @classmethod - def from_runtime_repr(klass, runtime_repr): - '''Create an extension from the Python runtime representation (nested - dictionaries). - ''' - result = klass(dcm_meta_ecode, '{}') - result._content = runtime_repr - result.check_valid() - return result - - @classmethod - def from_sequence(klass, seq, dim, affine=None, slice_dim=None): - '''Create an extension from a sequence of extensions. - - Parameters - ---------- - seq : sequence - The sequence of DcmMetaExtension objects. - - dim : int - The dimension to merge the extensions along. - - affine : array - The affine to use in the resulting extension. If None, the affine - from the first extension in `seq` will be used. - - slice_dim : int - The slice dimension to use in the resulting extension. If None, the - slice dimension from the first extension in `seq` will be used. - - Returns - ------- - result : DcmMetaExtension - The result of merging the extensions in `seq` along the dimension - `dim`. - ''' - if not 0 <= dim < 5: - raise ValueError("The argument 'dim' must be in the range [0, 5).") - - n_inputs = len(seq) - first_input = seq[0] - input_shape = first_input.shape - - if len(input_shape) > dim and input_shape[dim] != 1: - raise ValueError("The dim must be singular or not exist for the " - "inputs.") - - output_shape = list(input_shape) - while len(output_shape) <= dim: - output_shape.append(1) - output_shape[dim] = n_inputs - - if affine is None: - affine = first_input.affine - if slice_dim is None: - slice_dim = first_input.slice_dim - - result = klass.make_empty(output_shape, - affine, - None, - slice_dim) - - #Need to initialize the result with the first extension in 'seq' - result_slc_norm = result.slice_normal - first_slc_norm = first_input.slice_normal - use_slices = (not result_slc_norm is None and - not first_slc_norm is None and - np.allclose(result_slc_norm, first_slc_norm)) - for classes in first_input.get_valid_classes(): - if classes[1] == 'slices' and not use_slices: - continue - result._content[classes[0]][classes[1]] = \ - deepcopy(first_input.get_class_dict(classes)) - - #Adjust the shape to what the extension actually contains - shape = list(result.shape) - shape[dim] = 1 - result.shape = shape - - #Initialize reorient transform - reorient_transform = first_input.reorient_transform - - #Add the other extensions, updating the shape as we go - for input_ext in seq[1:]: - #If the affines or reorient_transforms don't match, we set the - #reorient_transform to None as we can not reliably use it to update - #directional meta data - if ((reorient_transform is None or - input_ext.reorient_transform is None) or - not (np.allclose(input_ext.affine, affine) or - np.allclose(input_ext.reorient_transform, - reorient_transform) - ) - ): - reorient_transform = None - result._insert(dim, input_ext) - shape[dim] += 1 - result.shape = shape - - #Set the reorient transform - result.reorient_transform = reorient_transform - - #Try simplifying any keys in global slices - for key in result.get_class_dict(('global', 'slices')).keys(): - result._simplify(key) - - return result - - def __str__(self): - return self._mangle(self._content) - - def __eq__(self, other): - if not np.allclose(self.affine, other.affine): - return False - if self.shape != other.shape: - return False - if self.slice_dim != other.slice_dim: - return False - if self.version != other.version: - return False - for classes in self.get_valid_classes(): - if (dict(self.get_class_dict(classes)) != - dict(other.get_class_dict(classes))): - return False - - return True - - def _unmangle(self, value): - '''Go from extension data to runtime representation.''' - #Its not possible to preserve order while loading with python 2.6 - kwargs = {} - if sys.version_info >= (2, 7): - kwargs['object_pairs_hook'] = OrderedDict - return json.loads(value, **kwargs) - - def _mangle(self, value): - '''Go from runtime representation to extension data.''' - return json.dumps(value, indent=4) - - _const_tests = {('global', 'slices') : (('global', 'const'), - ('vector', 'samples'), - ('time', 'samples') - ), - ('vector', 'slices') : (('global', 'const'), - ('time', 'samples') - ), - ('time', 'slices') : (('global', 'const'), - ), - ('time', 'samples') : (('global', 'const'), - ('vector', 'samples'), - ), - ('vector', 'samples') : (('global', 'const'),) - } - '''Classification mapping showing possible reductions in multiplicity for - values that are constant with some period.''' - - def _get_const_period(self, src_cls, dest_cls): - '''Get the period over which we test for const-ness with for the - given classification change.''' - if dest_cls == ('global', 'const'): - return None - elif src_cls == ('global', 'slices'): - return self.get_multiplicity(src_cls) / self.get_multiplicity(dest_cls) - elif src_cls == ('vector', 'slices'): #implies dest_cls == ('time', 'samples'): - return self.n_slices - elif src_cls == ('time', 'samples'): #implies dest_cls == ('vector', 'samples') - return self.shape[3] - assert False #Should take one of the above branches - - _repeat_tests = {('global', 'slices') : (('time', 'slices'), - ('vector', 'slices') - ), - ('vector', 'slices') : (('time', 'slices'),), - } - '''Classification mapping showing possible reductions in multiplicity for - values that are repeating with some period.''' - - def _simplify(self, key): - '''Try to simplify (reduce the multiplicity) of a single meta data - element by changing its classification. Return True if the - classification is changed, otherwise False. - - Looks for values that are constant or repeating with some pattern. - Constant elements with a value of None will be deleted. - ''' - values, curr_class = self.get_values_and_class(key) - - #If the class is global const then just delete it if the value is None - if curr_class == ('global', 'const'): - if values is None: - del self.get_class_dict(curr_class)[key] - return True - return False - - #Test if the values are constant with some period - dests = self._const_tests[curr_class] - for dest_cls in dests: - if dest_cls[0] in self._content: - period = self._get_const_period(curr_class, dest_cls) - #If the period is one, the two classifications have the - #same multiplicity so we are dealing with a degenerate - #case (i.e. single slice data). Just change the - #classification to the "simpler" one in this case - if period == 1 or is_constant(values, period): - if period is None: - self.get_class_dict(dest_cls)[key] = \ - values[0] - else: - self.get_class_dict(dest_cls)[key] = \ - values[::period] - break - else: #Otherwise test if values are repeating with some period - if curr_class in self._repeat_tests: - for dest_cls in self._repeat_tests[curr_class]: - if dest_cls[0] in self._content: - dest_mult = self.get_multiplicity(dest_cls) - if is_repeating(values, dest_mult): - self.get_class_dict(dest_cls)[key] = \ - values[:dest_mult] - break - else: #Can't simplify - return False - else: - return False - - del self.get_class_dict(curr_class)[key] - return True - - _preserving_changes = {None : (('global', 'const'), - ('vector', 'samples'), - ('time', 'samples'), - ('time', 'slices'), - ('vector', 'slices'), - ('global', 'slices'), - ), - ('global', 'const') : (('vector', 'samples'), - ('time', 'samples'), - ('time', 'slices'), - ('vector', 'slices'), - ('global', 'slices'), - ), - ('vector', 'samples') : (('time', 'samples'), - ('global', 'slices'), - ), - ('time', 'samples') : (('global', 'slices'), - ), - ('time', 'slices') : (('vector', 'slices'), - ('global', 'slices'), - ), - ('vector', 'slices') : (('global', 'slices'), - ), - ('global', 'slices') : tuple(), - } - '''Classification mapping showing allowed changes when increasing the - multiplicity.''' - - def _get_changed_class(self, key, new_class, slice_dim=None): - '''Get an array of values corresponding to a single meta data - element with its classification changed by increasing its - multiplicity. This will preserve all the meta data and allow easier - merging of values with different classifications.''' - values, curr_class = self.get_values_and_class(key) - if curr_class == new_class: - return values - - if not new_class in self._preserving_changes[curr_class]: - raise ValueError("Classification change would lose data.") - - if curr_class is None: - curr_mult = 1 - per_slice = False - else: - curr_mult = self.get_multiplicity(curr_class) - per_slice = curr_class[1] == 'slices' - if new_class in self.get_valid_classes(): - new_mult = self.get_multiplicity(new_class) - #Only way we get 0 for mult is if slice dim is undefined - if new_mult == 0: - new_mult = self.shape[slice_dim] - else: - new_mult = 1 - mult_fact = new_mult / curr_mult - if curr_mult == 1: - values = [values] - - - if per_slice: - result = values * mult_fact - else: - result = [] - for value in values: - result.extend([deepcopy(value)] * mult_fact) - - if new_class == ('global', 'const'): - result = result[0] - - return result - - - def _change_class(self, key, new_class): - '''Change the classification of the meta data element in place. See - _get_changed_class.''' - values, curr_class = self.get_values_and_class(key) - if curr_class == new_class: - return - - self.get_class_dict(new_class)[key] = self._get_changed_class(key, - new_class) - - if not curr_class is None: - del self.get_class_dict(curr_class)[key] - - - - def _copy_slice(self, other, src_class, idx): - '''Get a copy of the meta data from the 'other' instance with - classification 'src_class', corresponding to the slice with index - 'idx'.''' - if src_class[0] == 'global': - for classes in (('time', 'samples'), - ('vector', 'samples'), - ('global', 'const')): - if classes in self.get_valid_classes(): - dest_class = classes - break - elif src_class[0] == 'vector': - for classes in (('time', 'samples'), - ('global', 'const')): - if classes in self.get_valid_classes(): - dest_class = classes - break - else: - dest_class = ('global', 'const') - - src_dict = other.get_class_dict(src_class) - dest_dict = self.get_class_dict(dest_class) - dest_mult = self.get_multiplicity(dest_class) - stride = other.n_slices - for key, vals in src_dict.iteritems(): - subset_vals = vals[idx::stride] - - if len(subset_vals) < dest_mult: - full_vals = [] - for val_idx in xrange(dest_mult / len(subset_vals)): - full_vals += deepcopy(subset_vals) - subset_vals = full_vals - if len(subset_vals) == 1: - subset_vals = subset_vals[0] - dest_dict[key] = deepcopy(subset_vals) - self._simplify(key) - - def _global_slice_subset(self, key, sample_base, idx): - '''Get a subset of the meta data values with the classificaion - ('global', 'slices') corresponding to a single sample along the - time or vector dimension (as specified by 'sample_base' and 'idx'). - ''' - n_slices = self.n_slices - shape = self.shape - src_dict = self.get_class_dict(('global', 'slices')) - if sample_base == 'vector': - slices_per_vec = n_slices * shape[3] - start_idx = idx * slices_per_vec - end_idx = start_idx + slices_per_vec - return src_dict[key][start_idx:end_idx] - else: - if not ('vector', 'samples') in self.get_valid_classes(): - start_idx = idx * n_slices - end_idx = start_idx + n_slices - return src_dict[key][start_idx:end_idx] - else: - result = [] - slices_per_vec = n_slices * shape[3] - for vec_idx in xrange(shape[4]): - start_idx = (vec_idx * slices_per_vec) + (idx * n_slices) - end_idx = start_idx + n_slices - result.extend(src_dict[key][start_idx:end_idx]) - return result - - def _copy_sample(self, other, src_class, sample_base, idx): - '''Get a copy of meta data from 'other' instance with classification - 'src_class', corresponding to one sample along the time or vector - dimension.''' - assert src_class != ('global', 'const') - src_dict = other.get_class_dict(src_class) - if src_class[1] == 'samples': - #If we are indexing on the same dim as the src_class we need to - #change the classification - if src_class[0] == sample_base: - #Time samples may become vector samples, otherwise const - best_dest = None - for dest_cls in (('vector', 'samples'), - ('global', 'const')): - if (dest_cls != src_class and - dest_cls in self.get_valid_classes() - ): - best_dest = dest_cls - break - - dest_mult = self.get_multiplicity(dest_cls) - if dest_mult == 1: - for key, vals in src_dict.iteritems(): - self.get_class_dict(dest_cls)[key] = \ - deepcopy(vals[idx]) - else: #We must be doing time samples -> vector samples - stride = other.shape[3] - for key, vals in src_dict.iteritems(): - self.get_class_dict(dest_cls)[key] = \ - deepcopy(vals[idx::stride]) - for key in src_dict.keys(): - self._simplify(key) - - else: #Otherwise classification does not change - #The multiplicity will change for time samples if splitting - #vector dimension - if src_class == ('time', 'samples'): - dest_mult = self.get_multiplicity(src_class) - start_idx = idx * dest_mult - end_idx = start_idx + dest_mult - for key, vals in src_dict.iteritems(): - self.get_class_dict(src_class)[key] = \ - deepcopy(vals[start_idx:end_idx]) - self._simplify(key) - else: #Otherwise multiplicity is unchanged - for key, vals in src_dict.iteritems(): - self.get_class_dict(src_class)[key] = deepcopy(vals) - else: #The src_class is per slice - if src_class[0] == sample_base: - best_dest = None - for dest_class in self._preserving_changes[src_class]: - if dest_class in self.get_valid_classes(): - best_dest = dest_class - break - for key, vals in src_dict.iteritems(): - self.get_class_dict(best_dest)[key] = deepcopy(vals) - elif src_class[0] != 'global': - if sample_base == 'time': - #Take a subset of vector slices - n_slices = self.n_slices - start_idx = idx * n_slices - end_idx = start_idx + n_slices - for key, vals in src_dict.iteritems(): - self.get_class_dict(src_class)[key] = \ - deepcopy(vals[start_idx:end_idx]) - self._simplify(key) - else: - #Time slices are unchanged - for key, vals in src_dict.iteritems(): - self.get_class_dict(src_class)[key] = deepcopy(vals) - else: - #Take a subset of global slices - for key, vals in src_dict.iteritems(): - subset_vals = \ - other._global_slice_subset(key, sample_base, idx) - self.get_class_dict(src_class)[key] = deepcopy(subset_vals) - self._simplify(key) - - def _insert(self, dim, other): - self_slc_norm = self.slice_normal - other_slc_norm = other.slice_normal - - #If we are not using slice meta data, temporarily remove it from the - #other dcmmeta object - use_slices = (not self_slc_norm is None and - not other_slc_norm is None and - np.allclose(self_slc_norm, other_slc_norm)) - other_slc_meta = {} - if not use_slices: - for classes in other.get_valid_classes(): - if classes[1] == 'slices': - other_slc_meta[classes] = other.get_class_dict(classes) - other._content[classes[0]][classes[1]] = {} - missing_keys = list(set(self.get_keys()) - set(other.get_keys())) - for other_classes in other.get_valid_classes(): - other_keys = other.get_class_dict(other_classes).keys() - - #Treat missing keys as if they were in global const and have a value - #of None - if other_classes == ('global', 'const'): - other_keys += missing_keys - - #When possible, reclassify our meta data so it matches the other - #classification - for key in other_keys: - local_classes = self.get_classification(key) - if local_classes != other_classes: - local_allow = self._preserving_changes[local_classes] - other_allow = self._preserving_changes[other_classes] - - if other_classes in local_allow: - self._change_class(key, other_classes) - elif not local_classes in other_allow: - best_dest = None - for dest_class in local_allow: - if (dest_class[0] in self._content and - dest_class in other_allow): - best_dest = dest_class - break - self._change_class(key, best_dest) - - #Insert new meta data and further reclassify as necessary - for key in other_keys: - if dim == self.slice_dim: - self._insert_slice(key, other) - elif dim < 3: - self._insert_non_slice(key, other) - elif dim == 3: - self._insert_sample(key, other, 'time') - elif dim == 4: - self._insert_sample(key, other, 'vector') - - #Restore per slice meta if needed - if not use_slices: - for classes in other.get_valid_classes(): - if classes[1] == 'slices': - other._content[classes[0]][classes[1]] = \ - other_slc_meta[classes] - - def _insert_slice(self, key, other): - local_vals, classes = self.get_values_and_class(key) - other_vals = other._get_changed_class(key, classes, self.slice_dim) - - - #Handle some common / simple insertions with special cases - if classes == ('global', 'const'): - if local_vals != other_vals: - for dest_base in ('time', 'vector', 'global'): - if dest_base in self._content: - self._change_class(key, (dest_base, 'slices')) - other_vals = other._get_changed_class(key, - (dest_base, - 'slices'), - self.slice_dim - ) - self.get_values(key).extend(other_vals) - break - elif classes == ('time', 'slices'): - local_vals.extend(other_vals) - else: - #Default to putting in global slices and simplifying later - if classes != ('global', 'slices'): - self._change_class(key, ('global', 'slices')) - local_vals = self.get_class_dict(('global', 'slices'))[key] - other_vals = other._get_changed_class(key, - ('global', 'slices'), - self.slice_dim) - - #Need to interleave slices from different volumes - n_slices = self.n_slices - other_n_slices = other.n_slices - shape = self.shape - n_vols = 1 - for dim_size in shape[3:]: - n_vols *= dim_size - - intlv = [] - loc_start = 0 - oth_start = 0 - for vol_idx in xrange(n_vols): - intlv += local_vals[loc_start:loc_start + n_slices] - intlv += other_vals[oth_start:oth_start + other_n_slices] - loc_start += n_slices - oth_start += other_n_slices - - self.get_class_dict(('global', 'slices'))[key] = intlv - - def _insert_non_slice(self, key, other): - local_vals, classes = self.get_values_and_class(key) - other_vals = other._get_changed_class(key, classes, self.slice_dim) - - if local_vals != other_vals: - del self.get_class_dict(classes)[key] - - def _insert_sample(self, key, other, sample_base): - local_vals, classes = self.get_values_and_class(key) - other_vals = other._get_changed_class(key, classes, self.slice_dim) - - if classes == ('global', 'const'): - if local_vals != other_vals: - self._change_class(key, (sample_base, 'samples')) - local_vals = self.get_values(key) - other_vals = other._get_changed_class(key, - (sample_base, 'samples'), - self.slice_dim - ) - local_vals.extend(other_vals) - elif classes == (sample_base, 'samples'): - local_vals.extend(other_vals) - else: - if classes != ('global', 'slices'): - self._change_class(key, ('global', 'slices')) - local_vals = self.get_values(key) - other_vals = other._get_changed_class(key, - ('global', 'slices'), - self.slice_dim) - - shape = self.shape - n_dims = len(shape) - if sample_base == 'time' and n_dims == 5: - #Need to interleave values from the time points in each vector - #component - n_slices = self.n_slices - slices_per_vec = n_slices * shape[3] - oth_slc_per_vec = n_slices * other.shape[3] - - intlv = [] - loc_start = 0 - oth_start = 0 - for vec_idx in xrange(shape[4]): - intlv += local_vals[loc_start:loc_start+slices_per_vec] - intlv += other_vals[oth_start:oth_start+oth_slc_per_vec] - loc_start += slices_per_vec - oth_start += oth_slc_per_vec - - self.get_class_dict(('global', 'slices'))[key] = intlv - else: - local_vals.extend(other_vals) - -#Add our extension to nibabel -nb.nifti1.extension_codes.add_codes(((dcm_meta_ecode, - "dcmmeta", - DcmMetaExtension),) - ) - -class MissingExtensionError(Exception): - '''Exception denoting that there is no DcmMetaExtension in the Nifti header. - ''' - def __str__(self): - return 'No dcmmeta extension found.' - -def patch_dcm_ds_is(dcm): - '''Convert all elements in `dcm` with VR of 'DS' or 'IS' to floats and ints. - This is a hackish work around for the backwards incompatibility of pydicom - 0.9.7 and should not be needed once nibabel is updated. - ''' - for elem in dcm: - if elem.VM == 1: - if elem.VR in ('DS', 'IS'): - if elem.value == '': - continue - if elem.VR == 'DS': - elem.VR = 'FD' - elem.value = float(elem.value) - else: - elem.VR = 'SL' - elem.value = int(elem.value) - else: - if elem.VR in ('DS', 'IS'): - if elem.value == '': - continue - if elem.VR == 'DS': - elem.VR = 'FD' - elem.value = [float(val) for val in elem.value] - else: - elem.VR = 'SL' - elem.value = [int(val) for val in elem.value] - - -class NiftiWrapper(object): - '''Wraps a Nifti1Image object containing a DcmMeta header extension. - Provides access to the meta data and the ability to split or merge the - data array while updating the meta data. - - Parameters - ---------- - nii_img : nibabel.nifti1.Nifti1Image - The Nifti1Image to wrap. - - make_empty : bool - If True an empty DcmMetaExtension will be created if none is found. - - Raises - ------ - MissingExtensionError - No valid DcmMetaExtension was found. - - ValueError - More than one valid DcmMetaExtension was found. - ''' - - def __init__(self, nii_img, make_empty=False): - self.nii_img = nii_img - hdr = nii_img.get_header() - self.meta_ext = None - for extension in hdr.extensions: - if extension.get_code() == dcm_meta_ecode: - try: - extension.check_valid() - except InvalidExtensionError as e: - print("Found candidate extension, but invalid: %s" % e) - else: - if not self.meta_ext is None: - raise ValueError('More than one valid DcmMeta ' - 'extension found.') - self.meta_ext = extension - if not self.meta_ext: - if make_empty: - slice_dim = hdr.get_dim_info()[2] - self.meta_ext = \ - DcmMetaExtension.make_empty(self.nii_img.shape, - hdr.get_best_affine(), - None, - slice_dim) - hdr.extensions.append(self.meta_ext) - else: - raise MissingExtensionError - self.meta_ext.check_valid() - - def __getitem__(self, key): - '''Get the value for the given meta data key. Only considers meta data - that is globally constant. To access varying meta data you must use the - method 'get_meta'.''' - return self.meta_ext.get_class_dict(('global', 'const'))[key] - - def meta_valid(self, classification): - '''Return true if the meta data with the given classification appears - to be valid for the wrapped Nifti image. Considers the shape and - orientation of the image and the meta data extension.''' - if classification == ('global', 'const'): - return True - - img_shape = self.nii_img.get_shape() - meta_shape = self.meta_ext.shape - if classification == ('vector', 'samples'): - return meta_shape[4:] == img_shape[4:] - if classification == ('time', 'samples'): - return meta_shape[3:] == img_shape[3:] - - hdr = self.nii_img.get_header() - if self.meta_ext.n_slices != hdr.get_n_slices(): - return False - - slice_dim = hdr.get_dim_info()[2] - slice_dir = self.nii_img.get_affine()[slice_dim, :3] - slices_aligned = np.allclose(slice_dir, - self.meta_ext.slice_normal, - atol=1e-6) - - if classification == ('time', 'slices'): - return slices_aligned - if classification == ('vector', 'slices'): - return meta_shape[3] == img_shape[3] and slices_aligned - if classification == ('global', 'slices'): - return meta_shape[3:] == img_shape[3:] and slices_aligned - - def get_meta(self, key, index=None, default=None): - '''Return the meta data value for the provided `key`. - - Parameters - ---------- - key : str - The meta data key. - - index : tuple - The voxel index we are interested in. - - default - This will be returned if the meta data for `key` is not found. - - Returns - ------- - value - The meta data value for the given `key` (and optionally `index`) - - Notes - ----- - The per-sample and per-slice meta data will only be considered if the - `samples_valid` and `slices_valid` methods return True (respectively), - and an `index` is specified. - ''' - #Get the value(s) and classification for the key - values, classes = self.meta_ext.get_values_and_class(key) - if classes is None: - return default - - #Check if the value is constant - if classes == ('global', 'const'): - return values - - #Check if the classification is valid - if not self.meta_valid(classes): - return default - - #If an index is provided check the varying values - if not index is None: - #Test if the index is valid - shape = self.nii_img.get_shape() - if len(index) != len(shape): - raise IndexError('Incorrect number of indices.') - for dim, ind_val in enumerate(index): - if not 0 <= ind_val < shape[dim]: - raise IndexError('Index is out of bounds.') - - #First try per time/vector sample values - if classes == ('time', 'samples'): - return values[index[3]] - if classes == ('vector', 'samples'): - return values[index[4]] - - #Finally, if aligned, try per-slice values - slice_dim = self.nii_img.get_header().get_dim_info()[2] - n_slices = shape[slice_dim] - if classes == ('global', 'slices'): - val_idx = index[slice_dim] - for count, idx_val in enumerate(index[3:]): - val_idx += idx_val * n_slices - n_slices *= shape[count+3] - return values[val_idx] - elif classes == ('time', 'slices'): - val_idx = index[slice_dim] - return values[val_idx] - elif classes == ('vector', 'slices'): - val_idx = index[slice_dim] - val_idx += index[3]*n_slices - return values[val_idx] - - return default - - def remove_extension(self): - '''Remove the DcmMetaExtension from the header of nii_img. The - attribute `meta_ext` will still point to the extension.''' - hdr = self.nii_img.get_header() - target_idx = None - for idx, ext in enumerate(hdr.extensions): - if id(ext) == id(self.meta_ext): - target_idx = idx - break - else: - raise IndexError('Extension not found in header') - del hdr.extensions[target_idx] - # Nifti1Image.update_header will increase this if necessary - hdr['vox_offset'] = 0 - - def replace_extension(self, dcmmeta_ext): - '''Replace the DcmMetaExtension. - - Parameters - ---------- - dcmmeta_ext : DcmMetaExtension - The new DcmMetaExtension. - - ''' - self.remove_extension() - self.nii_img.get_header().extensions.append(dcmmeta_ext) - self.meta_ext = dcmmeta_ext - - def split(self, dim=None): - '''Generate splits of the array and meta data along the specified - dimension. - - Parameters - ---------- - dim : int - The dimension to split the voxel array along. If None it will - prefer the vector, then time, then slice dimensions. - - Returns - ------- - result - Generator which yields a NiftiWrapper result for each index - along `dim`. - - ''' - shape = self.nii_img.get_shape() - data = self.nii_img.get_data() - header = self.nii_img.get_header() - slice_dim = header.get_dim_info()[2] - - #If dim is None, choose the vector/time/slice dim in that order - if dim is None: - dim = len(shape) - 1 - if dim == 2: - if slice_dim is None: - raise ValueError("Slice dimension is not known") - dim = slice_dim - - #If we are splitting on a spatial dimension, we need to update the - #translation - trans_update = None - if dim < 3: - trans_update = header.get_best_affine()[:3, dim] - - split_hdr = header.copy() - slices = [slice(None)] * len(shape) - for idx in xrange(shape[dim]): - #Grab the split data, get rid of trailing singular dimensions - if dim >= 3 and dim == len(shape) - 1: - slices[dim] = idx - else: - slices[dim] = slice(idx, idx+1) - - split_data = data[slices].copy() - - #Update the translation in any affines if needed - if not trans_update is None and idx != 0: - qform = split_hdr.get_qform() - if not qform is None: - qform[:3, 3] += trans_update - split_hdr.set_qform(qform) - sform = split_hdr.get_sform() - if not sform is None: - sform[:3, 3] += trans_update - split_hdr.set_sform(sform) - - #Create the initial Nifti1Image object - split_nii = nb.Nifti1Image(split_data, - split_hdr.get_best_affine(), - header=split_hdr) - - #Replace the meta data with the appropriate subset - meta_dim = dim - if dim == slice_dim: - meta_dim = self.meta_ext.slice_dim - split_meta = self.meta_ext.get_subset(meta_dim, idx) - result = NiftiWrapper(split_nii) - result.replace_extension(split_meta) - - yield result - - def to_filename(self, out_path): - '''Write out the wrapped Nifti to a file - - Parameters - ---------- - out_path : str - The path to write out the file to - - Notes - ----- - Will check that the DcmMetaExtension is valid before writing the file. - ''' - self.meta_ext.check_valid() - self.nii_img.to_filename(out_path) - - @classmethod - def from_filename(klass, path): - '''Create a NiftiWrapper from a file. - - Parameters - ---------- - path : str - The path to the Nifti file to load. - ''' - return klass(nb.load(path)) - - @classmethod - def from_dicom_wrapper(klass, dcm_wrp, meta_dict=None): - '''Create a NiftiWrapper from a nibabel DicomWrapper. - - Parameters - ---------- - dcm_wrap : nicom.dicomwrappers.DicomWrapper - The dataset to convert into a NiftiWrapper. - - meta_dict : dict - An optional dictionary of meta data extracted from `dcm_data`. See - the `extract` module for generating this dict. - - ''' - data = dcm_wrp.get_data() - - #The Nifti patient space flips the x and y directions - affine = np.dot(np.diag([-1., -1., 1., 1.]), dcm_wrp.get_affine()) - - #Make 2D data 3D - if len(data.shape) == 2: - data = data.reshape(data.shape + (1,)) - - #Create the nifti image and set header data - nii_img = nb.nifti1.Nifti1Image(data, affine) - hdr = nii_img.get_header() - hdr.set_xyzt_units('mm', 'sec') - dim_info = {'freq' : None, - 'phase' : None, - 'slice' : 2 - } - if hasattr(dcm_wrp.dcm_data, 'InplanePhaseEncodingDirection'): - if dcm_wrp['InplanePhaseEncodingDirection'] == 'ROW': - dim_info['phase'] = 1 - dim_info['freq'] = 0 - else: - dim_info['phase'] = 0 - dim_info['freq'] = 1 - hdr.set_dim_info(**dim_info) - - #Embed the meta data extension - result = klass(nii_img, make_empty=True) - - result.meta_ext.reorient_transform = np.eye(4) - if meta_dict: - result.meta_ext.get_class_dict(('global', 'const')).update(meta_dict) - - return result - - @classmethod - def from_dicom(klass, dcm_data, meta_dict=None): - '''Create a NiftiWrapper from a single DICOM dataset. - - Parameters - ---------- - dcm_data : dicom.dataset.Dataset - The DICOM dataset to convert into a NiftiWrapper. - - meta_dict : dict - An optional dictionary of meta data extracted from `dcm_data`. See - the `extract` module for generating this dict. - - ''' - dcm_wrp = wrapper_from_data(dcm_data) - return klass.from_dicom_wrapper(dcm_wrp, meta_dict) - - @classmethod - def from_sequence(klass, seq, dim=None): - '''Create a NiftiWrapper by joining a sequence of NiftiWrapper objects - along the given dimension. - - Parameters - ---------- - seq : sequence - The sequence of NiftiWrapper objects. - - dim : int - The dimension to join the NiftiWrapper objects along. If None, - 2D inputs will become 3D, 3D inputs will become 4D, and 4D inputs - will become 5D. - - Returns - ------- - result : NiftiWrapper - The merged NiftiWrapper with updated meta data. - ''' - n_inputs = len(seq) - first_input = seq[0] - first_nii = first_input.nii_img - first_hdr = first_nii.get_header() - shape = first_nii.shape - affine = first_nii.get_affine().copy() - - #If dim is None, choose a sane default - if dim is None: - if len(shape) == 3: - singular_dim = None - for dim_idx, dim_size in enumerate(shape): - if dim_size == 1: - singular_dim = dim_idx - if singular_dim is None: - dim = 3 - else: - dim = singular_dim - if len(shape) == 4: - dim = 4 - else: - if not 0 <= dim < 5: - raise ValueError("The argument 'dim' must be in the range " - "[0, 5).") - if dim < len(shape) and shape[dim] != 1: - raise ValueError('The dimension must be singular or not exist') - - #Pull out the three axes vectors for validation of other input affines - axes = [] - for axis_idx in xrange(3): - axis_vec = affine[:3, axis_idx] - if axis_idx == dim: - axis_vec = axis_vec.copy() - axis_vec /= np.sqrt(np.dot(axis_vec, axis_vec)) - axes.append(axis_vec) - #Pull out the translation - trans = affine[:3, 3] - - #Determine the shape of the result data array and create it - result_shape = list(shape) - while dim >= len(result_shape): - result_shape.append(1) - result_shape[dim] = n_inputs - - result_dtype = max(input_wrp.nii_img.get_data().dtype - for input_wrp in seq) - result_data = np.empty(result_shape, dtype=result_dtype) - - #Start with the header info from the first input - hdr_info = {'qform' : first_hdr.get_qform(), - 'qform_code' : first_hdr['qform_code'], - 'sform' : first_hdr.get_sform(), - 'sform_code' : first_hdr['sform_code'], - 'dim_info' : list(first_hdr.get_dim_info()), - 'xyzt_units' : list(first_hdr.get_xyzt_units()), - } - - try: - hdr_info['slice_duration'] = first_hdr.get_slice_duration() - except HeaderDataError: - hdr_info['slice_duration'] = None - try: - hdr_info['intent'] = first_hdr.get_intent() - except HeaderDataError: - hdr_info['intent'] = None - try: - hdr_info['slice_times'] = first_hdr.get_slice_times() - except HeaderDataError: - hdr_info['slice_times'] = None - - #Fill the data array, check header consistency - data_slices = [slice(None)] * len(result_shape) - for dim_idx, dim_size in enumerate(result_shape): - if dim_size == 1: - data_slices[dim_idx] = 0 - last_trans = None #Keep track of the translation from last input - for input_idx in range(n_inputs): - - input_wrp = seq[input_idx] - input_nii = input_wrp.nii_img - input_aff = input_nii.get_affine() - input_hdr = input_nii.get_header() - - #Check that the affines match appropriately - for axis_idx, axis_vec in enumerate(axes): - in_vec = input_aff[:3, axis_idx] - - #If we are joining on this dimension - if axis_idx == dim: - #Allow scaling difference as it will be updated later - in_vec = in_vec.copy() - in_vec /= np.sqrt(np.dot(in_vec, in_vec)) - - in_trans = input_aff[:3, 3] - if not last_trans is None: - #Must be translated along the axis - trans_diff = in_trans - last_trans - if not np.allclose(trans_diff, 0.0): - trans_diff /= np.sqrt(np.dot(trans_diff, trans_diff)) - - if (np.allclose(trans_diff, 0.0) or - not np.allclose(np.dot(trans_diff, in_vec), - 1.0, - atol=1e-6) - ): - raise ValueError("Slices must be translated along the " - "normal direction") - - #Update reference to last translation - last_trans = in_trans - - #Check that axis vectors match - if not np.allclose(in_vec, axis_vec, atol=5e-4): - raise ValueError("Cannot join images with different " - "orientations.") - - - data_slices[dim] = input_idx - result_data[data_slices] = input_nii.get_data().squeeze() - - if input_idx != 0: - if (hdr_info['qform'] is None or - input_hdr.get_qform() is None or - not np.allclose(input_hdr.get_qform(), hdr_info['qform']) - ): - hdr_info['qform'] = None - if input_hdr['qform_code'] != hdr_info['qform_code']: - hdr_info['qform_code'] = None - if (hdr_info['sform'] is None or - input_hdr.get_sform() is None or - not np.allclose(input_hdr.get_sform(), hdr_info['sform']) - ): - hdr_info['sform'] = None - if input_hdr['sform_code'] != hdr_info['sform_code']: - hdr_info['sform_code'] = None - in_dim_info = list(input_hdr.get_dim_info()) - if in_dim_info != hdr_info['dim_info']: - for idx in xrange(3): - if in_dim_info[idx] != hdr_info['dim_info'][idx]: - hdr_info['dim_info'][idx] = None - in_xyzt_units = list(input_hdr.get_xyzt_units()) - if in_xyzt_units != hdr_info['xyzt_units']: - for idx in xrange(2): - if in_xyzt_units[idx] != hdr_info['xyzt_units'][idx]: - hdr_info['xyzt_units'][idx] = None - - try: - if input_hdr.get_slice_duration() != hdr_info['slice_duration']: - hdr_info['slice_duration'] = None - except HeaderDataError: - hdr_info['slice_duration'] = None - try: - if input_hdr.get_intent() != hdr_info['intent']: - hdr_info['intent'] = None - except HeaderDataError: - hdr_info['intent'] = None - try: - if input_hdr.get_slice_times() != hdr_info['slice_times']: - hdr_info['slice_times'] = None - except HeaderDataError: - hdr_info['slice_times'] = None - - #If we joined along a spatial dim, rescale the appropriate axis - scaled_dim_dir = None - if dim < 3: - scaled_dim_dir = seq[1].nii_img.get_affine()[:3, 3] - trans - affine[:3, dim] = scaled_dim_dir - - #Create the resulting Nifti and wrapper - result_nii = nb.Nifti1Image(result_data, affine) - result_hdr = result_nii.get_header() - - #Update the header with any info that is consistent across inputs - if hdr_info['qform'] is not None and hdr_info['qform_code'] is not None: - if not scaled_dim_dir is None: - hdr_info['qform'][:3, dim] = scaled_dim_dir - result_nii.set_qform(hdr_info['qform'], - int(hdr_info['qform_code']), - update_affine=True) - if hdr_info['sform'] is not None and hdr_info['sform_code'] is not None: - if not scaled_dim_dir is None: - hdr_info['sform'][:3, dim] = scaled_dim_dir - result_nii.set_sform(hdr_info['sform'], - int(hdr_info['sform_code']), - update_affine=True) - if hdr_info['dim_info'] is not None: - result_hdr.set_dim_info(*hdr_info['dim_info']) - slice_dim = hdr_info['dim_info'][2] - else: - slice_dim = None - if hdr_info['intent'] is not None: - result_hdr.set_intent(*hdr_info['intent']) - if hdr_info['xyzt_units'] is not None: - result_hdr.set_xyzt_units(*hdr_info['xyzt_units']) - if hdr_info['slice_duration'] is not None: - result_hdr.set_slice_duration(hdr_info['slice_duration']) - if hdr_info['slice_times'] is not None: - result_hdr.set_slice_times(hdr_info['slice_times']) - - #Create the meta data extension and insert it - seq_exts = [elem.meta_ext for elem in seq] - result_ext = DcmMetaExtension.from_sequence(seq_exts, - dim, - affine, - slice_dim) - result_hdr.extensions.append(result_ext) - - return NiftiWrapper(result_nii) diff --git a/fsl/data/dcmstack/dcmstack.py b/fsl/data/dcmstack/dcmstack.py deleted file mode 100644 index 0e20a6d3a1036448f5f7749767ad14358b3cd7f0..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/dcmstack.py +++ /dev/null @@ -1,1136 +0,0 @@ -""" -Stack DICOM datasets into volumes. The contents of this module are imported -into the package namespace. -""" -from __future__ import division -import warnings, re, dicom -from copy import deepcopy -from collections import MutableSequence -import six -import nibabel as nb -from nibabel.nifti1 import Nifti1Extensions -from nibabel.spatialimages import HeaderDataError -from nibabel.orientations import (io_orientation, - apply_orientation, - axcodes2ornt, - inv_ornt_aff) -import numpy as np -from .dcmmeta import DcmMetaExtension, NiftiWrapper - -with warnings.catch_warnings(): - warnings.simplefilter('ignore') - from nibabel.nicom.dicomwrappers import wrapper_from_data - -def make_key_regex_filter(exclude_res, force_include_res=None): - '''Make a meta data filter using regular expressions. - - Parameters - ---------- - exclude_res : sequence - Sequence of regular expression strings. Any meta data where the key - matches one of these expressions will be excluded, unless it matches - one of the `force_include_res`. - force_include_res : sequence - Sequence of regular expression strings. Any meta data where the key - matches one of these expressions will be included. - - Returns - ------- - A callable which can be passed to `DicomStack` as the `meta_filter`. - ''' - exclude_re = re.compile('|'.join(['(?:' + regex + ')' - for regex in exclude_res]) - ) - include_re = None - if force_include_res: - include_re = re.compile('|'.join(['(?:' + regex + ')' - for regex in force_include_res]) - ) - - def key_regex_filter(key, value): - return (exclude_re.search(key) and - not (include_re and include_re.search(key))) - return key_regex_filter - -default_key_excl_res = ['Patient', - 'Physician', - 'Operator', - 'Date', - 'Birth', - 'Address', - 'Institution', - 'Station', - 'SiteName', - 'Age', - 'Comment', - 'Phone', - 'Telephone', - 'Insurance', - 'Religious', - 'Language', - 'Military', - 'MedicalRecord', - 'Ethnic', - 'Occupation', - 'Unknown', - 'PrivateTagData', - 'UID', - 'StudyDescription', - 'DeviceSerialNumber', - 'ReferencedImageSequence', - 'RequestedProcedureDescription', - 'PerformedProcedureStepDescription', - 'PerformedProcedureStepID', - ] -'''A list of regexes passed to `make_key_regex_filter` as `exclude_res` to -create the `default_meta_filter`.''' - -default_key_incl_res = ['ImageOrientationPatient', - 'ImagePositionPatient', - ] -'''A list of regexes passed to `make_key_regex_filter` as `force_include_res` -to create the `default_meta_filter`.''' - -default_meta_filter = make_key_regex_filter(default_key_excl_res, - default_key_incl_res) -'''Default meta_filter for `DicomStack`.''' - -def ornt_transform(start_ornt, end_ornt): - '''Return the orientation that transforms from `start_ornt` to `end_ornt`. - - Parameters - ---------- - start_ornt : (n,2) orientation array - Initial orientation. - - end_ornt : (n,2) orientation array - Final orientation. - - Returns - ------- - orientations : (p, 2) ndarray - The orientation that will transform the `start_ornt` to the `end_ornt`. - ''' - start_ornt = np.asarray(start_ornt) - end_ornt = np.asarray(end_ornt) - if start_ornt.shape != end_ornt.shape: - raise ValueError("The orientations must have the same shape") - if start_ornt.shape[1] != 2: - raise ValueError("Invalid shape for an orientation: %s" % - start_ornt.shape) - result = np.empty_like(start_ornt) - for end_in_idx, (end_out_idx, end_flip) in enumerate(end_ornt): - for start_in_idx, (start_out_idx, start_flip) in enumerate(start_ornt): - if end_out_idx == start_out_idx: - if start_flip == end_flip: - flip = 1 - else: - flip = -1 - result[start_in_idx, :] = [end_in_idx, flip] - break - else: - raise ValueError("Unable to find out axis %d in start_ornt" % - end_out_idx) - return result - - -def reorder_voxels(vox_array, affine, voxel_order): - '''Reorder the given voxel array and corresponding affine. - - Parameters - ---------- - vox_array : array - The array of voxel data - - affine : array - The affine for mapping voxel indices to Nifti patient space - - voxel_order : str - A three character code specifing the desired ending point for rows, - columns, and slices in terms of the orthogonal axes of patient space: - (l)eft, (r)ight, (a)nterior, (p)osterior, (s)uperior, and (i)nferior. - - Returns - ------- - out_vox : array - An updated view of vox_array. - - out_aff : array - A new array with the updated affine - - reorient_transform : array - The transform used to update the affine. - - ornt_trans : tuple - The orientation transform used to update the orientation. - - ''' - #Check if voxel_order is valid - voxel_order = voxel_order.upper() - if len(voxel_order) != 3: - raise ValueError('The voxel_order must contain three characters') - dcm_axes = ['LR', 'AP', 'SI'] - for char in voxel_order: - if not char in 'LRAPSI': - raise ValueError('The characters in voxel_order must be one ' - 'of: L,R,A,P,I,S') - for idx, axis in enumerate(dcm_axes): - if char in axis: - del dcm_axes[idx] - if len(dcm_axes) != 0: - raise ValueError('No character in voxel_order corresponding to ' - 'axes: %s' % dcm_axes) - - #Check the vox_array and affine have correct shape/size - if len(vox_array.shape) < 3: - raise ValueError('The vox_array must be at least three dimensional') - if affine.shape != (4, 4): - raise ValueError('The affine must be 4x4') - - #Pull the current index directions from the affine - orig_ornt = io_orientation(affine) - new_ornt = axcodes2ornt(voxel_order) - ornt_trans = ornt_transform(orig_ornt, new_ornt) - orig_shape = vox_array.shape - vox_array = apply_orientation(vox_array, ornt_trans) - aff_trans = inv_ornt_aff(ornt_trans, orig_shape) - affine = np.dot(affine, aff_trans) - - return (vox_array, affine, aff_trans, ornt_trans) - -def dcm_time_to_sec(time_str): - '''Convert a DICOM time value (value representation of 'TM') to the number - of seconds past midnight. - - Parameters - ---------- - time_str : str - The DICOM time value string - - Returns - ------- - A floating point representing the number of seconds past midnight - ''' - #Allow ACR/NEMA style format by removing any colon chars - time_str = time_str.replace(':', '') - - #Only the hours portion is required - result = int(time_str[:2]) * 3600 - - str_len = len(time_str) - if str_len > 2: - result += int(time_str[2:4]) * 60 - if str_len > 4: - result += float(time_str[4:]) - - return float(result) - -class IncongruentImageError(Exception): - def __init__(self, msg): - '''An exception denoting that a DICOM with incorrect size or orientation - was passed to `DicomStack.add_dcm`.''' - self.msg = msg - - def __str__(self): - return 'The image is not congruent to the existing stack: %s' % self.msg - -class ImageCollisionError(Exception): - '''An exception denoting that a DICOM which collides with one already in - the stack was passed to a `DicomStack.add_dcm`.''' - def __str__(self): - return 'The image collides with one already in the stack' - -class InvalidStackError(Exception): - def __init__(self, msg): - '''An exception denoting that a `DicomStack` is not currently valid''' - self.msg = msg - - def __str__(self): - return 'The DICOM stack is not valid: %s' % self.msg - -class DicomOrdering(object): - '''Object defining an ordering for a set of dicom datasets. Create a - DicomOrdering with the given DICOM element keyword. - - Parameters - ---------- - key : str - The DICOM keyword to use for ordering the datasets - - abs_ordering : sequence - A sequence specifying the absolute order for values corresponding - to the `key`. Instead of ordering by the value associated with the - `key`, the index of the value in this sequence will be used. - - abs_as_str : bool - If true, the values will be converted to strings before looking up - the index in `abs_ordering`. - ''' - - def __init__(self, key, abs_ordering=None, abs_as_str=False): - self.key = key - self.abs_ordering = abs_ordering - self.abs_as_str = abs_as_str - - def get_ordinate(self, ds): - '''Get the ordinate for the given DICOM data set. - - Parameters - ---------- - ds : dict like - The DICOM data set we want the ordinate of. Should allow - dict like access where DICOM keywords return the corresponing - value. - - Returns - ------- - An ordinate for the data set. If `abs_ordering` is None then this will - just be the value for the keyword `key`. Otherwise it will be an - integer. - ''' - try: - val = ds[self.key] - except KeyError: - return None - - if self.abs_ordering: - if self.abs_as_str: - val = str(val) - return self.abs_ordering.index(val) - - return val - -def _make_dummy(reference, meta, iop): - '''Make a "dummy" NiftiWrapper (no valid pixel data).''' - #Create the dummy data array filled with largest representable value - data = np.empty_like(reference.nii_img.get_data()) - data[...] = np.iinfo(np.int16).max - - #Create the nifti image and set header data - aff = reference.nii_img.get_affine().copy() - aff[:3, 3] = [iop[1], iop[0], iop[2]] - nii_img = nb.nifti1.Nifti1Image(data, aff) - hdr = nii_img.get_header() - hdr.set_xyzt_units('mm', 'sec') - dim_info = {'freq' : None, - 'phase' : None, - 'slice' : 2 - } - if 'InplanePhaseEncodingDirection' in meta: - if meta['InplanePhaseEncodingDirection'] == 'ROW': - dim_info['phase'] = 1 - dim_info['freq'] = 0 - else: - dim_info['phase'] = 0 - dim_info['freq'] = 1 - hdr.set_dim_info(**dim_info) - - #Embed the meta data extension - result = NiftiWrapper(nii_img, make_empty=True) - result.meta_ext.reorient_transform = np.diag([-1., -1., 1., 1.]) - result.meta_ext.get_class_dict(('global', 'const')).update(meta) - - return result - -default_group_keys = ('SeriesInstanceUID', - 'SeriesNumber', - 'ProtocolName', - 'ImageOrientationPatient') -'''Default keys for grouping DICOM files that belong in the same -multi-dimensional array together.''' - -class DicomStack(object): - '''Defines a method for stacking together DICOM data sets into a multi - dimensional volume. - - Tailored towards creating NiftiImage output, but can also just create numpy - arrays. Can summarize all of the meta data from the input DICOM data sets - into a Nifti header extension (see `dcmmeta.DcmMetaExtension`). - - Parameters - ---------- - time_order : str or DicomOrdering - The DICOM keyword or DicomOrdering object specifying how to order - the DICOM data sets along the time dimension. - - vector_order : str or DicomOrdering - The DICOM keyword or DicomOrdering object specifying how to order - the DICOM data sets along the vector dimension. - - allow_dummies : bool - If True then data sets without pixel data can be added to the stack. - The "dummy" voxels will have the maximum representable value for - the datatype. - - meta_filter : callable - A callable that takes a meta data key and value, and returns True if - that meta data element should be excluded from the DcmMeta extension. - - Notes - ----- - If both time_order and vector_order are None, the time_order will be - guessed based off the data sets. - ''' - - sort_guesses = ['EchoTime', - 'InversionTime', - 'RepetitionTime', - 'FlipAngle', - 'TriggerTime', - 'AcquisitionTime', - 'ContentTime', - 'AcquisitionNumber', - 'InstanceNumber', - ] - '''The meta data keywords used when trying to guess the sorting order. - Keys that come earlier in the list are given higher priority.''' - - minimal_keys = set(sort_guesses + - ['Rows', - 'Columns', - 'PixelSpacing', - 'ImageOrientationPatient', - 'InPlanePhaseEncodingDirection', - 'RepetitionTime', - 'AcquisitionTime' - ] + - list(default_group_keys) - ) - '''Set of minimal meta data keys that should be provided if they exist in - the source DICOM files.''' - - def __init__(self, time_order=None, vector_order=None, - allow_dummies=False, meta_filter=None): - if isinstance(time_order, str): - self._time_order = DicomOrdering(time_order) - else: - self._time_order = time_order - if isinstance(vector_order, str): - self._vector_order = DicomOrdering(vector_order) - else: - self._vector_order = vector_order - - if meta_filter is None: - self._meta_filter = default_meta_filter - else: - self._meta_filter = meta_filter - - self._allow_dummies = allow_dummies - - #Sets all the state variables to their defaults - self.clear() - - def _chk_equal(self, keys, meta1, meta2): - for key in keys: - if meta1[key] != meta2[key]: - raise IncongruentImageError("%s does not match" % key) - - def _chk_close(self, keys, meta1, meta2): - for key in keys: - if not np.allclose(meta1[key], meta2[key], atol=5e-5): - raise IncongruentImageError("%s is not close to matching" % - key) - - def _chk_congruent(self, meta): - is_dummy = not 'Rows' in meta or not 'Columns' in meta - if is_dummy and not self._allow_dummies: - raise IncongruentImageError('Missing Rows/Columns') - - if not self._ref_input is None: - self._chk_close(('PixelSpacing', - 'ImageOrientationPatient'), - meta, - self._ref_input - ) - if not is_dummy: - self._chk_equal(('Rows', 'Columns'), meta, self._ref_input) - elif len(self._dummies) != 0: - self._chk_close(('PixelSpacing', - 'ImageOrientationPatient'), - meta, - self._dummies[0][0] - ) - return is_dummy - - - def add_dcm(self, dcm, meta=None): - '''Add a pydicom dataset to the stack. - - Parameters - ---------- - dcm : dicom.dataset.Dataset - The data set being added to the stack - - meta : dict - The extracted meta data for the DICOM data set `dcm`. If None - extract.default_extractor will be used. - - Raises - ------ - IncongruentImageError - The provided `dcm` does not match the orientation or dimensions of - those already in the stack. - - ImageCollisionError - The provided `dcm` has the same slice location and time/vector - values. - - ''' - if meta is None: - from .extract import default_extractor - meta = default_extractor(dcm) - - dw = wrapper_from_data(dcm) - - is_dummy = self._chk_congruent(meta) - - self._phase_enc_dirs.add(meta.get('InPlanePhaseEncodingDirection')) - self._repetition_times.add(meta.get('RepetitionTime')) - - #Pull the info used for sorting - slice_pos = dw.slice_indicator - self._slice_pos_vals.add(slice_pos) - time_val = None - if self._time_order: - time_val = self._time_order.get_ordinate(meta) - self._time_vals.add(time_val) - vector_val = None - if self._vector_order: - vector_val = self._vector_order.get_ordinate(meta) - self._vector_vals.add(vector_val) - - #Create a tuple with the sorting values - sorting_tuple = (vector_val, time_val, slice_pos) - - #If a explicit order was specified, raise an exception if image - #collides with another already in the stack - if ((not self._time_order is None or - not self._vector_order is None) and - sorting_tuple in self._sorting_tuples - ): - raise ImageCollisionError() - self._sorting_tuples.add(sorting_tuple) - - #Create a NiftiWrapper for this input if possible - nii_wrp = None - if not is_dummy: - nii_wrp = NiftiWrapper.from_dicom_wrapper(dw, meta) - if self._ref_input is None: - #We don't have a reference input yet, use this one - self._ref_input = nii_wrp - #Convert any dummies that we have stashed previously - for dummy_meta, dummy_tuple, iop in self._dummies: - dummy_wrp = _make_dummy(self._ref_input, dummy_meta, iop) - self._files_info.append((dummy_wrp, dummy_tuple)) - else: - if self._ref_input is None: - #We don't have a reference input, so stash the dummy for now - self._dummies.append((meta, sorting_tuple, dcm.ImagePositionPatient)) - else: - #Convert dummy using the reference input - nii_wrp = _make_dummy(self._ref_input, meta, dcm.ImagePositionPatient) - - #If we made a NiftiWrapper add it to the stack - if not nii_wrp is None: - self._files_info.append((nii_wrp, sorting_tuple)) - - #Set the dirty flags - self._shape_dirty = True - self._meta_dirty = True - - def clear(self): - '''Remove any DICOM datasets from the stack.''' - self._slice_pos_vals = set() - self._time_vals = set() - self._vector_vals = set() - self._sorting_tuples = set() - - self._phase_enc_dirs = set() - self._repetition_times = set() - - self._dummies = [] - self._ref_input = None - - self._shape_dirty = True - self._shape = None - - self._meta_dirty = True - self._meta = None - - self._files_info = [] - - def _chk_order(self, slice_positions, files_per_vol, num_volumes, - num_time_points, num_vec_comps): - #Sort the files - self._files_info.sort(key=lambda x: x[1]) - if files_per_vol > 1: - for vol_idx in range(num_volumes): - start_slice = vol_idx * files_per_vol - end_slice = start_slice + files_per_vol - self._files_info[start_slice:end_slice] = \ - sorted(self._files_info[start_slice:end_slice], - key=lambda x: x[1][-1]) - - #Do a thorough check for correctness - for vec_idx in range(num_vec_comps): - file_idx = vec_idx*num_time_points*files_per_vol - curr_vec_val = self._files_info[file_idx][1][0] - for time_idx in range(num_time_points): - for slice_idx in range(files_per_vol): - file_idx = (vec_idx*num_time_points*files_per_vol + - time_idx*files_per_vol + slice_idx) - file_info = self._files_info[file_idx] - if file_info[1][0] != curr_vec_val: - raise InvalidStackError("Not enough images with the " + - "vector value of " + - str(curr_vec_val)) - if (file_info[1][2] != slice_positions[slice_idx]): - if (file_info[1][2] == slice_positions[slice_idx-1]): - error_msg = ["Duplicate slice position"] - else: - error_msg = ["Missing slice position"] - error_msg.append(" at slice index %d" % slice_idx) - if num_time_points > 1: - error_msg.append(' in time point %d' % time_idx) - if num_vec_comps > 1: - error_msg.append(' for vector component %s' % - str(curr_vec_val)) - raise InvalidStackError(''.join(error_msg)) - - - def get_shape(self): - '''Get the shape of the stack. - - Returns - ------- - A tuple of integers giving the size of the dimensions of the stack. - - Raises - ------ - InvalidStackError - The stack is incomplete or invalid. - ''' - #If the dirty flag is not set, return the cached value - if not self._shape_dirty: - return self._shape - - #We need at least one non-dummy file in the stack - if len(self._files_info) == 0: - raise InvalidStackError("No (non-dummy) files in the stack") - - #Figure out number of files and slices per volume - files_per_vol = len(self._slice_pos_vals) - slice_positions = sorted(list(self._slice_pos_vals)) - - #If more than one file per volume, check that slice spacing is equal - if files_per_vol > 1: - spacings = [] - for idx in range(files_per_vol - 1): - spacings.append(slice_positions[idx+1] - slice_positions[idx]) - spacings = np.array(spacings) - avg_spacing = np.mean(spacings) - if not np.allclose(avg_spacing, spacings, rtol=4e-2): - raise InvalidStackError("Slice spacings are not consistent") - - #Simple check for an incomplete stack - if len(self._files_info) % files_per_vol != 0: - raise InvalidStackError("Number of files is not an even multiple " - "of the number of unique slice positions.") - num_volumes = len(self._files_info) // files_per_vol - - #Figure out the number of vector components and time points - num_vec_comps = len(self._vector_vals) - if num_vec_comps > num_volumes: - raise InvalidStackError("Vector variable varies within volumes") - if num_volumes % num_vec_comps != 0: - raise InvalidStackError("Number of volumes not an even multiple " - "of the number of vector components.") - num_time_points = num_volumes // num_vec_comps - - #If both sort keys are None try to guess - if (num_volumes > 1 and self._time_order is None and - self._vector_order is None): - #Get a list of possible sort orders - possible_orders = [] - for key in self.sort_guesses: - vals = set([file_info[0].get_meta(key) - for file_info in self._files_info] - ) - if len(vals) == num_volumes or len(vals) == len(self._files_info): - possible_orders.append(key) - if len(possible_orders) == 0: - raise InvalidStackError("Unable to guess key for sorting the " - "fourth dimension") - - #Try out each possible sort order - for time_order in possible_orders: - #Update sorting tuples - for idx in range(len(self._files_info)): - nii_wrp, curr_tuple = self._files_info[idx] - self._files_info[idx] = (nii_wrp, - (curr_tuple[0], - nii_wrp[time_order], - curr_tuple[2] - ) - ) - - #Check the order - try: - self._chk_order(slice_positions, - files_per_vol, - num_volumes, - num_time_points, - num_vec_comps) - except InvalidStackError: - pass - else: - break - else: - raise InvalidStackError("Unable to guess key for sorting the " - "fourth dimension") - else: #If at least on sort key was specified, just check the order - self._chk_order(slice_positions, - files_per_vol, - num_volumes, - num_time_points, - num_vec_comps) - - #Stack appears to be valid, build the shape tuple - file_shape = self._files_info[0][0].nii_img.get_shape() - vol_shape = list(file_shape) - if files_per_vol > 1: - vol_shape[2] = files_per_vol - shape = vol_shape+ [num_time_points, num_vec_comps] - if shape[4] == 1: - shape = shape[:-1] - if shape[3] == 1: - shape = shape[:-1] - self._shape = tuple(shape) - - self._shape_dirty = False - return self._shape - - def get_data(self): - '''Get an array of the voxel values. - - Returns - ------- - A numpy array filled with values from the DICOM data sets' pixels. - - Raises - ------ - InvalidStackError - The stack is incomplete or invalid. - ''' - #Create a numpy array for storing the voxel data - stack_shape = self.get_shape() - stack_shape = tuple(list(stack_shape) + ((5 - len(stack_shape)) * [1])) - stack_dtype = self._files_info[0][0].nii_img.get_data_dtype() - #This is a hack to keep fslview happy, Shouldn't cause issues as the - #original data should be 12-bit and any scaling will result in float - #data - if stack_dtype == np.uint16: - stack_dtype = np.int16 - vox_array = np.empty(stack_shape, dtype=stack_dtype) - - #Fill the array with data - n_vols = 1 - if len(stack_shape) > 3: - n_vols *= stack_shape[3] - if len(stack_shape) > 4: - n_vols *= stack_shape[4] - files_per_vol = len(self._files_info) // n_vols - file_shape = self._files_info[0][0].nii_img.get_shape() - for vec_idx in range(stack_shape[4]): - for time_idx in range(stack_shape[3]): - if files_per_vol == 1 and file_shape[2] != 1: - file_idx = vec_idx*(stack_shape[3]) + time_idx - vox_array[:, :, :, time_idx, vec_idx] = \ - self._files_info[file_idx][0].nii_img.get_data() - else: - for slice_idx in range(files_per_vol): - file_idx = (vec_idx*(stack_shape[3]*stack_shape[2]) + - time_idx*(stack_shape[2]) + slice_idx) - vox_array[:, :, slice_idx, time_idx, vec_idx] = \ - self._files_info[file_idx][0].nii_img.get_data()[:, :, 0] - - #Trim unused time/vector dimensions - if stack_shape[4] == 1: - vox_array = vox_array[...,0] - if stack_shape[3] == 1: - vox_array = vox_array[...,0] - - return vox_array - - def get_affine(self): - '''Get the affine transform for mapping row/column/slice indices - to Nifti (RAS) patient space. - - Returns - ------- - A 4x4 numpy array containing the affine transform. - - Raises - ------ - InvalidStackError - The stack is incomplete or invalid. - ''' - #Figure out the number of three (or two) dimensional volumes - shape = self.get_shape() - n_vols = 1 - if len(shape) > 3: - n_vols *= shape[3] - if len(shape) > 4: - n_vols *= shape[4] - - #Figure out the number of files in each volume - files_per_vol = len(self._files_info) / n_vols - - #Pull the DICOM Patient Space affine from the first input - aff = self._files_info[0][0].nii_img.get_affine() - - #If there is more than one file per volume, we need to fix slice scaling - if files_per_vol > 1: - first_offset = aff[:3, 3] - second_offset = self._files_info[1][0].nii_img.get_affine()[:3, 3] - scaled_slc_dir = second_offset - first_offset - aff[:3, 2] = scaled_slc_dir - - return aff - - def to_nifti(self, voxel_order='LAS', embed_meta=False): - '''Returns a NiftiImage with the data and affine from the stack. - - Parameters - ---------- - voxel_order : str - A three character string repsenting the voxel order in patient - space (see the function `reorder_voxels`). Can be None or an empty - string to disable reorientation. - - embed_meta : bool - If true a dcmmeta.DcmMetaExtension will be embedded in the Nifti - header. - - Returns - ------- - A nibabel.nifti1.Nifti1Image created with the stack's data and affine. - ''' - #Get the voxel data and affine - data = self.get_data() - affine = self.get_affine() - - #Figure out the number of three (or two) dimensional volumes - n_vols = 1 - if len(data.shape) > 3: - n_vols *= data.shape[3] - if len(data.shape) > 4: - n_vols *= data.shape[4] - - files_per_vol = len(self._files_info) // n_vols - - #Reorder the voxel data if requested - permutation = [0, 1, 2] - slice_dim = 2 - reorient_transform = np.eye(4) - if voxel_order: - (data, - affine, - reorient_transform, - ornt_trans) = reorder_voxels(data, affine, voxel_order) - permutation, flips = zip(*ornt_trans) - permutation = [int(val) for val in permutation] - - #Reverse file order in each volume's files if we flipped slice order - #This will keep the slice times and meta data order correct - if files_per_vol > 1 and flips[slice_dim] == -1: - self._shape_dirty = True - for vol_idx in range(n_vols): - start = vol_idx * files_per_vol - stop = start + files_per_vol - self._files_info[start:stop] = [self._files_info[idx] - for idx in range(stop - 1, - start - 1, - -1) - ] - - #Update the slice dim - slice_dim = permutation[2] - - #Create the nifti image using the data array - nifti_image = nb.Nifti1Image(data, affine) - nifti_header = nifti_image.get_header() - - #Set the units and dimension info - nifti_header.set_xyzt_units('mm', 'msec') - if len(self._repetition_times) == 1 and not None in self._repetition_times: - nifti_header['pixdim'][4] = list(self._repetition_times)[0] - dim_info = {'freq' : None, 'phase' : None, 'slice' : slice_dim} - if len(self._phase_enc_dirs) == 1 and not None in self._phase_enc_dirs: - phase_dir = list(self._phase_enc_dirs)[0] - if phase_dir == 'ROW': - dim_info['phase'] = permutation[1] - dim_info['freq'] = permutation[0] - else: - dim_info['phase'] = permutation[0] - dim_info['freq'] = permutation[1] - nifti_header.set_dim_info(**dim_info) - n_slices = data.shape[slice_dim] - - #Set the slice timing header info - has_acq_time = (self._files_info[0][0].get_meta('AcquisitionTime') != - None) - if files_per_vol > 1 and has_acq_time: - #Pull out the relative slice times for the first volume - slice_times = np.array([dcm_time_to_sec(file_info[0]['AcquisitionTime']) - for file_info in self._files_info[:n_slices]] - ) - slice_times -= np.min(slice_times) - - #If there is more than one volume, check if times are consistent - is_consistent = True - for vol_idx in range(1, n_vols): - start_slice = vol_idx * n_slices - end_slice = start_slice + n_slices - slices_info = self._files_info[start_slice:end_slice] - vol_slc_times = \ - np.array([dcm_time_to_sec(file_info[0]['AcquisitionTime']) - for file_info in slices_info] - ) - vol_slc_times -= np.min(vol_slc_times) - if not np.allclose(slice_times, vol_slc_times): - is_consistent = False - break - - #If the times are consistent and not all zero, try setting the slice - #times (sets the slice duration and code if possible). - if is_consistent and not np.allclose(slice_times, 0.0): - try: - nifti_header.set_slice_times(slice_times) - except HeaderDataError: - pass - - #Embed the meta data extension if requested - if embed_meta: - #Build meta data for each volume if needed - vol_meta = [] - if files_per_vol > 1: - for vol_idx in range(n_vols): - start_slice = vol_idx * n_slices - end_slice = start_slice + n_slices - exts = [file_info[0].meta_ext - for file_info in self._files_info[start_slice:end_slice]] - meta = DcmMetaExtension.from_sequence(exts, 2) - vol_meta.append(meta) - else: - vol_meta = [file_info[0].meta_ext - for file_info in self._files_info] - - #Build meta data for each time point / vector component - if len(data.shape) == 5: - if data.shape[3] != 1: - vec_meta = [] - for vec_idx in range(data.shape[4]): - start_idx = vec_idx * data.shape[3] - end_idx = start_idx + data.shape[3] - meta = DcmMetaExtension.from_sequence(\ - vol_meta[start_idx:end_idx], 3) - vec_meta.append(meta) - else: - vec_meta = vol_meta - - meta_ext = DcmMetaExtension.from_sequence(vec_meta, 4) - elif len(data.shape) == 4: - meta_ext = DcmMetaExtension.from_sequence(vol_meta, 3) - else: - meta_ext = vol_meta[0] - if meta_ext is file_info[0].meta_ext: - meta_ext = deepcopy(meta_ext) - - meta_ext.shape = data.shape - meta_ext.slice_dim = slice_dim - meta_ext.affine = nifti_header.get_best_affine() - meta_ext.reorient_transform = reorient_transform - - #Filter and embed the meta data - meta_ext.filter_meta(self._meta_filter) - nifti_header.extensions = Nifti1Extensions([meta_ext]) - - nifti_image.update_header() - return nifti_image - - def to_nifti_wrapper(self, voxel_order=''): - '''Convienance method. Calls `to_nifti` and returns a `NiftiWrapper` - generated from the result. - ''' - return NiftiWrapper(self.to_nifti(voxel_order, True)) - -def parse_and_group(src_paths, group_by=default_group_keys, extractor=None, - force=False, warn_on_except=False, - close_tests=('ImageOrientationPatient',)): - '''Parse the given dicom files and group them together. Each group is - stored as a (list) value in a dict where the key is a tuple of values - corresponding to the keys in 'group_by' - - Parameters - ---------- - src_paths : sequence - A list of paths to the source DICOM files, or a list of open - pydicom FileDataset objects. - - group_by : tuple - Meta data keys to group data sets with. Any data set with the same - values for these keys will be grouped together. This tuple of values - will also be the key in the result dictionary. - - extractor : callable - Should take a dicom.dataset.Dataset and return a dictionary of the - extracted meta data. - - force : bool - Force reading source files even if they do not appear to be DICOM. - - warn_on_except : bool - Convert exceptions into warnings, possibly allowing some results to be - returned. - - close_tests : sequence - Any `group_by` key listed here is tested with `numpy.allclose` instead - of straight equality when determining group membership. - - Returns - ------- - groups : dict - A dict mapping tuples of values (corresponding to 'group_by') to groups - of data sets. Each element in the list is a tuple containing the dicom - object, the parsed meta data, and the filename. - ''' - if extractor is None: - from .extract import default_extractor - extractor = default_extractor - - are_paths = isinstance(src_paths[0], six.string_types) - - results = {} - close_elems = {} - for dcm_path in src_paths: - #Read the DICOM file - if are_paths: - try: - dcm = dicom.read_file(dcm_path, force=force) - except Exception as e: - if warn_on_except: - warnings.warn('Error reading file %s: %s' % (dcm_path, - str(e))) - continue - else: - raise - else: - dcm = dcm_path - dcm_path = getattr(dcm, 'filename', None) - - #Extract the meta data and group - meta = extractor(dcm) - key_list = [] # Values from group_by elems with equality testing - close_list = [] # Values from group_by elems with np.allclose testing - for grp_key in group_by: - key_elem = meta.get(grp_key) - if isinstance(key_elem, list): - key_elem = tuple(key_elem) - if grp_key in close_tests: - close_list.append(key_elem) - else: - key_list.append(key_elem) - - # Initially each key has multiple sub_results (corresponding to - # different values of the "close" keys) - key = tuple(key_list) - if not key in results: - results[key] = [(close_list, [(dcm, meta, dcm_path)])] - else: - # Look for a matching sub_result - for c_list, sub_res in results[key]: - for c_idx, c_val in enumerate(c_list): - if not np.allclose(c_val, close_list[c_idx], atol=5e-5): - break - else: - sub_res.append((dcm, meta, dcm_path)) - break - else: - # No match found, append another sub result - results[key].append((close_list, [(dcm, meta, dcm_path)])) - - # Unpack sub results, using the canonical value for the close keys - full_results = {} - for eq_key, sub_res_list in results.items(): - for close_key, sub_res in sub_res_list: - full_key = [] - eq_idx = 0 - close_idx = 0 - for grp_key in group_by: - if grp_key in close_tests: - val = close_key[close_idx] - close_idx += 1 - else: - val = eq_key[eq_idx] - eq_idx += 1 - if isinstance(val, MutableSequence): - val = tuple(val) - full_key.append(val) - full_key = tuple(full_key) - full_results[full_key] = sub_res - - return full_results - -def stack_group(group, warn_on_except=False, **stack_args): - result = DicomStack(**stack_args) - for dcm, meta, fn in group: - try: - result.add_dcm(dcm, meta) - except Exception as e: - if warn_on_except: - warnings.warn('Error adding file %s to stack: %s' % - (fn, str(e))) - else: - raise - return result - -def parse_and_stack(src_paths, group_by=default_group_keys, extractor=None, - force=False, warn_on_except=False, **stack_args): - '''Parse the given dicom files into a dictionary containing one or more - DicomStack objects. - - Parameters - ---------- - src_paths : sequence - A list of paths to the source DICOM files. - - group_by : tuple - Meta data keys to group data sets with. Any data set with the same - values for these keys will be grouped together. This tuple of values - will also be the key in the result dictionary. - - extractor : callable - Should take a dicom.dataset.Dataset and return a dictionary of the - extracted meta data. - - force : bool - Force reading source files even if they do not appear to be DICOM. - - warn_on_except : bool - Convert exceptions into warnings, possibly allowing some results to be - returned. - - stack_args : kwargs - Keyword arguments to pass to the DicomStack constructor. - ''' - results = parse_and_group(src_paths, - group_by, - extractor, - force, - warn_on_except) - - for key, group in results.items(): - results[key] = stack_group(group, warn_on_except, **stack_args) - - return results diff --git a/fsl/data/dcmstack/dcmstack_cli.py b/fsl/data/dcmstack/dcmstack_cli.py deleted file mode 100755 index 1c041a9cb706497eb9fa4d2ad3fed74b6583700f..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/dcmstack_cli.py +++ /dev/null @@ -1,357 +0,0 @@ -""" -Command line interface to dcmstack. - -@author: moloney -""" -from __future__ import print_function - -import os, sys, argparse, string -from glob import glob -import dicom -from . import dcmstack -from .dcmstack import (parse_and_group, stack_group, DicomOrdering, - default_group_keys) -from .dcmmeta import NiftiWrapper -from . import extract -from .info import __version__ - - -prog_descrip = """Stack DICOM files from each source directory into 2D to 5D -volumes, optionally extracting meta data. -""" - -prog_epilog = """IT IS YOUR RESPONSIBILITY TO KNOW IF THERE IS PRIVATE HEALTH -INFORMATION IN THE METADATA EXTRACTED BY THIS PROGRAM.""" - -def parse_tags(opt_str): - tag_strs = opt_str.split(',') - tags = [] - for tag_str in tag_strs: - tokens = tag_str.split('_') - if len(tokens) != 2: - raise ValueError('Invalid str format for tags') - tags.append(dicom.tag.Tag(int(tokens[0].strip(), 16), - int(tokens[1].strip(), 16)) - ) - return tags - -def sanitize_path_comp(path_comp): - result = [] - for char in path_comp: - if not char in string.letters + string.digits + '-_.': - result.append('_') - else: - result.append(char) - return ''.join(result) - -def main(argv=sys.argv): - #Handle command line options - arg_parser = argparse.ArgumentParser(description=prog_descrip, - epilog=prog_epilog) - arg_parser.add_argument('src_dirs', nargs='*', help=('The source ' - 'directories containing DICOM files.')) - - input_opt = arg_parser.add_argument_group('Input options') - input_opt.add_argument('--force-read', action='store_true', default=False, - help=('Try reading all files as DICOM, even if they ' - 'are missing the preamble.')) - input_opt.add_argument('--file-ext', default='.dcm', help=('Only try reading ' - 'files with the given extension. Default: ' - '%(default)s')) - input_opt.add_argument('--allow-dummies', action='store_true', default=False, - help=('Allow DICOM files that are missing pixel ' - 'data, filling that slice of the output nifti with ' - 'the maximum representable value.')) - - output_opt = arg_parser.add_argument_group('Output options') - output_opt.add_argument('--dest-dir', default=None, - help=('Destination directory, defaults to the ' - 'source directory.')) - output_opt.add_argument('-o', '--output-name', default=None, - help=('Python format string determining the output ' - 'filenames based on DICOM tags.')) - output_opt.add_argument('--output-ext', default='.nii.gz', - help=('The extension for the output file type. ' - 'Default: %(default)s')) - output_opt.add_argument('-d', '--dump-meta', default=False, - action='store_true', help=('Dump the extracted ' - 'meta data into a JSON file with the same base ' - 'name as the generated Nifti')) - output_opt.add_argument('--embed-meta', default=False, action='store_true', - help=('Embed the extracted meta data into a Nifti ' - 'header extension (in JSON format).')) - - stack_opt = arg_parser.add_argument_group('Stacking Options') - stack_opt.add_argument('-g', '--group-by', default=None, - help=("Comma separated list of meta data keys to " - "group input files into stacks with.")) - stack_opt.add_argument('--voxel-order', default='LAS', - help=('Order the voxels so the spatial indices ' - 'start from these directions in patient space. ' - 'The directions in patient space should be given ' - 'as a three character code: (l)eft, (r)ight, ' - '(a)nterior, (p)osterior, (s)uperior, (i)nferior. ' - 'Passing an empty string will disable ' - 'reorientation. ' - 'Default: %(default)s')) - stack_opt.add_argument('-t', '--time-var', default=None, - help=('The DICOM element keyword to use for ' - 'ordering the stack along the time dimension.')) - stack_opt.add_argument('--vector-var', default=None, - help=('The DICOM element keyword to use for ' - 'ordering the stack along the vector dimension.')) - stack_opt.add_argument('--time-order', default=None, - help=('Provide a text file with the desired order ' - 'for the values (one per line) of the attribute ' - 'used as the time variable. This option is rarely ' - 'needed.')) - stack_opt.add_argument('--vector-order', default=None, - help=('Provide a text file with the desired order ' - 'for the values (one per line) of the attribute ' - 'used as the vector variable. This option is rarely ' - 'needed.')) - - meta_opt = arg_parser.add_argument_group('Meta Extraction and Filtering ' - 'Options') - meta_opt.add_argument('-l', '--list-translators', default=False, - action='store_true', help=('List enabled translators ' - 'and exit')) - meta_opt.add_argument('--disable-translator', default=None, - help=('Disable the translators for the provided ' - 'tags. Tags should be given in the format ' - '"0x0_0x0". More than one can be given in a comma ' - 'separated list. If the word "all" is provided, all ' - 'translators will be disabled.')) - meta_opt.add_argument('--extract-private', default=False, - action='store_true', - help=('Extract meta data from private elements, even ' - 'if there is no translator. If the value for the ' - 'element contains non-ascii bytes it will still be ' - 'ignored. The extracted meta data may still be ' - 'filtered out by the regular expressions.')) - meta_opt.add_argument('-i', '--include-regex', action='append', - help=('Include any meta data where the key matches ' - 'the provided regular expression. This will override ' - 'any exclude expressions. Applies to all meta data.')) - meta_opt.add_argument('-e', '--exclude-regex', action='append', - help=('Exclude any meta data where the key matches ' - 'the provided regular expression. This will ' - 'supplement the default exclude expressions. Applies ' - 'to all meta data.')) - meta_opt.add_argument('--default-regexes', default=False, - action='store_true', - help=('Print the list of default include and exclude ' - 'regular expressions and exit.')) - - gen_opt = arg_parser.add_argument_group('General Options') - gen_opt.add_argument('-v', '--verbose', default=False, action='store_true', - help=('Print additional information.')) - gen_opt.add_argument('--strict', default=False, action='store_true', - help=('Fail on the first exception instead of ' - 'showing a warning.')) - gen_opt.add_argument('--version', default=False, action='store_true', - help=('Show the version and exit.')) - - args = arg_parser.parse_args(argv[1:]) - - if args.version: - print(__version__) - return 0 - - #Check if we are just listing the translators - if args.list_translators: - for translator in extract.default_translators: - print('%s -> %s' % (translator.tag, translator.name)) - return 0 - - #Check if we are just listing the default exclude regular expressions - if args.default_regexes: - print('Default exclude regular expressions:') - for regex in dcmstack.default_key_excl_res: - print('\t' + regex) - print('Default include regular expressions:') - for regex in dcmstack.default_key_incl_res: - print('\t' + regex) - return 0 - - #Check if we are generating meta data - gen_meta = args.embed_meta or args.dump_meta - - if gen_meta: - #Start with the module defaults - ignore_rules = extract.default_ignore_rules - translators = extract.default_translators - - #Disable translators if requested - if args.disable_translator: - if args.disable_translator.lower() == 'all': - translators = tuple() - else: - try: - disable_tags = parse_tags(args.disable_translator) - except: - arg_parser.error('Invalid tag format to --disable-translator.') - new_translators = [] - for translator in translators: - if not translator.tag in disable_tags: - new_translators.append(translator) - translators = new_translators - - #Include non-translated private elements if requested - if args.extract_private: - ignore_rules = (extract.ignore_pixel_data, - extract.ignore_overlay_data, - extract.ignore_color_lut_data) - - extractor = extract.MetaExtractor(ignore_rules, translators) - - else: - extractor = extract.minimal_extractor - - #Add include/exclude regexes to meta filter - include_regexes = dcmstack.default_key_incl_res - if args.include_regex: - include_regexes += args.include_regex - exclude_regexes = dcmstack.default_key_excl_res - if args.exclude_regex: - exclude_regexes += args.exclude_regex - meta_filter = dcmstack.make_key_regex_filter(exclude_regexes, - include_regexes) - - #Figure out time and vector ordering - if args.time_var: - if args.time_order: - order_file = open(args.time_order) - abs_order = [line.strip() for line in order_file.readlines()] - order_file.close() - time_order = DicomOrdering(args.time_var, abs_order, True) - else: - time_order = DicomOrdering(args.time_var) - else: - time_order = None - if args.vector_var: - if args.vector_order: - order_file = open(args.vector_order) - abs_order = [line.strip() for line in order_file.readlines()] - order_file.close() - vector_order = DicomOrdering(args.vector_var, abs_order, True) - else: - vector_order = DicomOrdering(args.vector_var) - else: - vector_order = None - - if len(args.src_dirs) == 0: - arg_parser.error('No source directories were provided.') - - #Handle group-by option - if not args.group_by is None: - group_by = args.group_by.split(',') - else: - group_by = default_group_keys - - #Handle each source directory individually - for src_dir in args.src_dirs: - if not os.path.isdir(src_dir): - print('%s is not a directory, skipping' % src_dir, file=sys.stderr) - - if args.verbose: - print("Processing source directory %s" % src_dir) - - #Build a list of paths to source files - glob_str = os.path.join(src_dir, '*') - if args.file_ext: - glob_str += args.file_ext - src_paths = glob(glob_str) - - if args.verbose: - print("Found %d source files in the directory" % len(src_paths)) - - #Group the files in this directory - groups = parse_and_group(src_paths, - group_by, - extractor, - args.force_read, - not args.strict, - ) - - if args.verbose: - print("Found %d groups of DICOM images" % len(groups)) - - if len(groups) == 0: - print("No DICOM files found in %s" % src_dir) - - out_idx = 0 - generated_outs = set() - for key, group in groups.iteritems(): - stack = stack_group(group, - warn_on_except=not args.strict, - time_order=time_order, - vector_order=vector_order, - allow_dummies=args.allow_dummies, - meta_filter=meta_filter) - meta = group[0][1] - - #Build an appropriate output format string if none was specified - if args.output_name is None: - out_fmt = [] - if 'SeriesNumber' in meta: - out_fmt.append('%(SeriesNumber)03d') - if 'ProtocolName' in meta: - out_fmt.append('%(ProtocolName)s') - elif 'SeriesDescription' in meta: - out_fmt.append('%(SeriesDescription)s') - else: - out_fmt.append('series') - out_fmt = '-'.join(out_fmt) - else: - out_fmt = args.output_name - - #Get the output filename from the format string, make sure the - #result is unique for this source directory - out_fn = sanitize_path_comp(out_fmt % meta) - if out_fn in generated_outs: - out_fn += '-%03d' % out_idx - generated_outs.add(out_fn) - out_idx += 1 - out_fn = out_fn + args.output_ext - - if args.dest_dir: - out_path = os.path.join(args.dest_dir, out_fn) - else: - out_path = os.path.join(src_dir, out_fn) - - if args.verbose: - print("Writing out stack to path %s" % out_path) - - nii = stack.to_nifti(args.voxel_order, gen_meta) - - if args.dump_meta: - nii_wrp = NiftiWrapper(nii) - path_tokens = out_path.split('.') - if path_tokens[-1] == 'gz': - path_tokens = path_tokens[:-1] - if path_tokens[-1] == 'nii': - path_tokens = path_tokens[:-1] - meta_path = '.'.join(path_tokens + ['json']) - out_file = open(meta_path, 'w') - out_file.write(nii_wrp.meta_ext.to_json()) - out_file.close() - - if not args.embed_meta: - nii_wrp.remove_extension() - - del nii_wrp - - nii.to_filename(out_path) - - del key - del group - del stack - del meta - del nii - del groups - - return 0 - -if __name__ == '__main__': - sys.exit(main()) diff --git a/fsl/data/dcmstack/extract.py b/fsl/data/dcmstack/extract.py deleted file mode 100644 index 13f94abf00f5f08455d867b9cfeb5d5a618b7720..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/extract.py +++ /dev/null @@ -1,521 +0,0 @@ -""" -Extract meta data from a DICOM data set. -""" -import sys -import struct, warnings -from collections import namedtuple, defaultdict -import dicom -from dicom.datadict import keyword_for_tag -from nibabel.nicom import csareader -from .dcmstack import DicomStack -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict -try: - import chardet - have_chardet = True -except ImportError: - have_chardet = False - pass - -#This is needed to allow extraction on files with invalid values (e.g. too -#long of a decimal string) -dicom.config.enforce_valid_values = False - -# Python 2 / 3 compatibility -unicode_str = unicode if sys.version_info[0] < 3 else str - -def is_ascii(in_str): - '''Return true if the given string is valid ASCII.''' - if all(' ' <= c <= '~' for c in in_str): - return True - return False - -def ignore_private(elem): - '''Ignore rule for `MetaExtractor` to skip private DICOM elements (odd - group number).''' - if elem.tag.group % 2 == 1: - return True - return False - -def ignore_pixel_data(elem): - return elem.tag == dicom.tag.Tag(0x7fe0, 0x10) - -def ignore_overlay_data(elem): - return elem.tag.group & 0xff00 == 0x6000 and elem.tag.elem == 0x3000 - -def ignore_color_lut_data(elem): - return (elem.tag.group == 0x28 and - elem.tag.elem in (0x1201, 0x1202, 0x1203, 0x1221, 0x1222, 0x1223)) - -default_ignore_rules = (ignore_private, - ignore_pixel_data, - ignore_overlay_data, - ignore_color_lut_data) -'''The default tuple of ignore rules for `MetaExtractor`.''' - - -Translator = namedtuple('Translator', ['name', - 'tag', - 'priv_creator', - 'trans_func'] - ) -'''A namedtuple for storing the four elements of a translator: a name, the -dicom.tag.Tag that can be translated, the private creator string (optional), and -the function which takes the DICOM element and returns a dictionary.''' - -def simplify_csa_dict(csa_dict): - '''Simplify the result of nibabel.nicom.csareader. - - Parameters - ---------- - csa_dict : dict - The result from nibabel.nicom.csareader - - Returns - ------- - result : OrderedDict - Result where the keys come from the 'tags' sub dictionary of `csa_dict`. - The values come from the 'items' within that tags sub sub dictionary. - If items has only one element it will be unpacked from the list. - ''' - if csa_dict is None: - return None - - result = OrderedDict() - for tag in csa_dict['tags']: - items = csa_dict['tags'][tag]['items'] - if len(items) == 0: - continue - elif len(items) == 1: - result[tag] = items[0] - else: - result[tag] = items - return result - -def csa_image_trans_func(elem): - '''Function for translating the CSA image sub header.''' - return simplify_csa_dict(csareader.read(elem.value)) - -csa_image_trans = Translator('CsaImage', - dicom.tag.Tag(0x29, 0x1010), - 'SIEMENS CSA HEADER', - csa_image_trans_func) -'''Translator for the CSA image sub header.''' - -class PhoenixParseError(Exception): - def __init__(self, line): - '''Exception indicating a error parsing a line from the Phoenix - Protocol. - ''' - self.line = line - - def __str__(self): - return 'Unable to parse phoenix protocol line: %s' % self.line - -def _parse_phoenix_line(line, str_delim='""'): - delim_len = len(str_delim) - #Handle most comments (not always when string literal involved) - comment_idx = line.find('#') - if comment_idx != -1: - #Check if the pound sign is in a string literal - if line[:comment_idx].count(str_delim) == 1: - if line[comment_idx:].find(str_delim) == -1: - raise PhoenixParseError(line) - else: - line = line[:comment_idx] - - #Allow empty lines - if line.strip() == '': - return None - - #Find the first equals sign and use that to split key/value - equals_idx = line.find('=') - if equals_idx == -1: - raise PhoenixParseError(line) - key = line[:equals_idx].strip() - val_str = line[equals_idx + 1:].strip() - - #If there is a string literal, pull that out - if val_str.startswith(str_delim): - end_quote = val_str[delim_len:].find(str_delim) + delim_len - if end_quote == -1: - raise PhoenixParseError(line) - elif not end_quote == len(val_str) - delim_len: - #Make sure remainder is just comment - if not val_str[end_quote+delim_len:].strip().startswith('#'): - raise PhoenixParseError(line) - - return (key, val_str[2:end_quote]) - - else: #Otherwise try to convert to an int or float - val = None - try: - val = int(val_str) - except ValueError: - pass - else: - return (key, val) - - try: - val = int(val_str, 16) - except ValueError: - pass - else: - return (key, val) - - try: - val = float(val_str) - except ValueError: - pass - else: - return (key, val) - - raise PhoenixParseError(line) - -def parse_phoenix_prot(prot_key, prot_val): - '''Parse the MrPheonixProtocol string. - - Parameters - ---------- - prot_str : str - The 'MrPheonixProtocol' string from the CSA Series sub header. - - Returns - ------- - prot_dict : OrderedDict - Meta data pulled from the ASCCONV section. - - Raises - ------ - PhoenixParseError : A line of the ASCCONV section could not be parsed. - ''' - if prot_key == 'MrPhoenixProtocol': - str_delim = '""' - elif prot_key == 'MrProtocol': - str_delim = '"' - else: - raise ValueError('Unknown protocol key: %s' % prot_key) - ascconv_start = prot_val.find('### ASCCONV BEGIN ') - ascconv_end = prot_val.find('### ASCCONV END ###') - ascconv = prot_val[ascconv_start:ascconv_end].split('\n')[1:-1] - - result = OrderedDict() - for line in ascconv: - parse_result = _parse_phoenix_line(line, str_delim) - if parse_result: - result[parse_result[0]] = parse_result[1] - - return result - -def csa_series_trans_func(elem): - '''Function for parsing the CSA series sub header.''' - csa_dict = simplify_csa_dict(csareader.read(elem.value)) - - #If there is a phoenix protocol, parse it and dump it into the csa_dict - phx_src = None - if 'MrPhoenixProtocol' in csa_dict: - phx_src = 'MrPhoenixProtocol' - elif 'MrProtocol' in csa_dict: - phx_src = 'MrProtocol' - - if not phx_src is None: - phoenix_dict = parse_phoenix_prot(phx_src, csa_dict[phx_src]) - del csa_dict[phx_src] - for key, val in phoenix_dict.items(): - new_key = '%s.%s' % ('MrPhoenixProtocol', key) - csa_dict[new_key] = val - - return csa_dict - -csa_series_trans = Translator('CsaSeries', - dicom.tag.Tag(0x29, 0x1020), - 'SIEMENS CSA HEADER', - csa_series_trans_func) -'''Translator for parsing the CSA series sub header.''' - - -default_translators = (csa_image_trans, - csa_series_trans, - ) -'''Default translators for MetaExtractor.''' - -def tag_to_str(tag): - '''Convert a DICOM tag to a string representation using the group and - element hex values seprated by an underscore.''' - return '%#X_%#X' % (tag.group, tag.elem) - -unpack_vr_map = {'SL' : 'i', - 'UL' : 'I', - 'FL' : 'f', - 'FD' : 'd', - 'SS' : 'h', - 'US' : 'H', - 'US or SS' : 'H', - } -'''Dictionary mapping value representations to corresponding format strings for -the struct.unpack function.''' - -def tm_to_seconds(time_str): - '''Convert a DICOM time value (value representation of 'TM') to the number - of seconds past midnight. - - Parameters - ---------- - time_str : str - The DICOM time value string - - Returns - ------- - A floating point representing the number of seconds past midnight - ''' - #Allow ACR/NEMA style format by removing any colon chars - time_str = time_str.replace(':', '') - - #Only the hours portion is required - result = int(time_str[:2]) * 3600 - - str_len = len(time_str) - if str_len > 2: - result += int(time_str[2:4]) * 60 - if str_len > 4: - result += float(time_str[4:]) - - return float(result) - -def get_text(byte_str): - '''If the given byte string contains text data return it as unicode, - otherwise return None. - - If the 'chardet' package is installed, this will be used to detect the - text encoding. Otherwise the input will only be decoded if it is ASCII. - ''' - if have_chardet: - match = chardet.detect(byte_str) - if match['encoding'] is None: - return None - else: - return byte_str.decode(match['encoding']) - else: - if not is_ascii(byte_str): - return None - else: - return byte_str.decode('ascii') - -default_conversions = {'DS' : float, - 'IS' : int, - 'AT' : str, - 'OW' : get_text, - 'OB' : get_text, - 'OW or OB' : get_text, - 'OB or OW' : get_text, - 'UN' : get_text, - 'PN' : unicode_str, - 'UI' : unicode_str, - } - -class MetaExtractor(object): - '''Callable object for extracting meta data from a dicom dataset. - Initialize with a set of ignore rules, translators, and type - conversions. - - Parameters - ---------- - ignore_rules : sequence - A sequence of callables, each of which should take a DICOM element - and return True if it should be ignored. If None the module - default is used. - - translators : sequence - A sequence of `Translator` objects each of which can convert a - DICOM element into a dictionary. Overrides any ignore rules. If - None the module default is used. - - conversions : dict - Mapping of DICOM value representation (VR) strings to callables - that perform some conversion on the value - - warn_on_trans_except : bool - Convert any exceptions from translators into warnings. - ''' - - def __init__(self, ignore_rules=None, translators=None, conversions=None, - warn_on_trans_except=True): - if ignore_rules is None: - self.ignore_rules = default_ignore_rules - else: - self.ignore_rules = ignore_rules - if translators is None: - self.translators = default_translators - else: - self.translators = translators - if conversions is None: - self.conversions = default_conversions - else: - self.conversions = conversions - self.warn_on_trans_except = warn_on_trans_except - - def _get_elem_key(self, elem): - '''Get the key for any non-translated elements.''' - #Use standard DICOM keywords if possible - key = keyword_for_tag(elem.tag) - - #For private tags we take elem.name and convert to camel case - if key == '': - key = elem.name - if key.startswith('[') and key.endswith(']'): - key = key[1:-1] - tokens = [token[0].upper() + token[1:] - for token in key.split()] - key = ''.join(tokens) - - return key - - def _get_elem_value(self, elem): - '''Get the value for any non-translated elements''' - #If the VR is implicit, we may need to unpack the values from a byte - #string. This may require us to make an assumption about whether the - #value is signed or not, but this is unavoidable. - if elem.VR in unpack_vr_map and isinstance(elem.value, str): - n_vals = len(elem.value)/struct.calcsize(unpack_vr_map[elem.VR]) - if n_vals != elem.VM: - warnings.warn("The element's VM and the number of values do " - "not match.") - if n_vals == 1: - value = struct.unpack(unpack_vr_map[elem.VR], elem.value)[0] - else: - value = list(struct.unpack(unpack_vr_map[elem.VR]*n_vals, - elem.value) - ) - else: - #Otherwise, just take a copy if the value is a list - n_vals = elem.VM - if n_vals > 1: - value = elem.value[:] - else: - value = elem.value - - #Handle any conversions - if elem.VR in self.conversions: - if n_vals == 1: - value = self.conversions[elem.VR](value) - else: - value = [self.conversions[elem.VR](val) for val in value] - - return value - - def __call__(self, dcm): - '''Extract the meta data from a DICOM dataset. - - Parameters - ---------- - dcm : dicom.dataset.Dataset - The DICOM dataset to extract the meta data from. - - Returns - ------- - meta : dict - A dictionary of extracted meta data. - - Notes - ----- - Non-private tags use the DICOM keywords as keys. Translators have their - name, followed by a dot, prepended to the keys of any meta elements - they produce. Values are unchanged, except when the value - representation is 'DS' or 'IS' (decimal/integer strings) they are - converted to float and int types. - ''' - standard_meta = [] - trans_meta_dicts = OrderedDict() - - #Make dict to track which tags map to which translators - trans_map = {} - - # Convert text elements to unicode - dcm.decode() - - for elem in dcm: - if isinstance(elem.value, str) and elem.value.strip() == '': - continue - - #Get the name for non-translated elements - name = self._get_elem_key(elem) - - #If it is a private creator element, setup any corresponding - #translators - if elem.name == "Private Creator": - for translator in self.translators: - if translator.priv_creator == elem.value: - new_elem = ((translator.tag.elem & 0xff) | - (elem.tag.elem * 16**2)) - new_tag = dicom.tag.Tag(elem.tag.group, new_elem) - if new_tag in trans_map: - raise ValueError('More than one translator ' - 'for tag: %s' % new_tag) - trans_map[new_tag] = translator - - #If there is a translator for this element, use it - if elem.tag in trans_map: - try: - meta = trans_map[elem.tag].trans_func(elem) - except Exception as e: - if self.warn_on_trans_except: - warnings.warn("Exception from translator %s: %s" % - (trans_map[elem.tag].name, - repr(str(e)))) - else: - raise - else: - if meta: - trans_meta_dicts[trans_map[elem.tag].name] = meta - #Otherwise see if we are supposed to ignore the element - elif any(rule(elem) for rule in self.ignore_rules): - continue - #Handle elements that are sequences with recursion - elif isinstance(elem.value, dicom.sequence.Sequence): - value = [] - for val in elem.value: - value.append(self(val)) - if all(x is None for x in value): - continue - standard_meta.append((name, value, elem.tag)) - #Otherwise just make sure the value is unpacked - else: - value = self._get_elem_value(elem) - if value is None: - continue - standard_meta.append((name, value, elem.tag)) - - #Handle name collisions - name_counts = defaultdict(int) - for elem in standard_meta: - name_counts[elem[0]] += 1 - result = OrderedDict() - for name, value, tag in standard_meta: - if name_counts[name] > 1: - name = name + '_' + tag_to_str(tag) - result[name] = value - - #Inject translator results - for trans_name, meta in trans_meta_dicts.items(): - for name, value in meta.items(): - name = '%s.%s' % (trans_name, name) - result[name] = value - - return result - -def minimal_extractor(dcm): - '''Meta data extractor that just extracts the minimal set of keys needed - by DicomStack objects. - ''' - result = {} - for key in DicomStack.minimal_keys: - try: - result[key] = dcm.__getattr__(key) - except AttributeError: - pass - return result - -default_extractor = MetaExtractor() -'''The default `MetaExtractor`.''' diff --git a/fsl/data/dcmstack/info.py b/fsl/data/dcmstack/info.py deleted file mode 100644 index a39ab07fd8bc45c93082be8d92f1865d9eab85a9..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/info.py +++ /dev/null @@ -1,54 +0,0 @@ -""" Information for setup.py that we may also want to access in dcmstack. Can -not import dcmstack. -""" -import sys - -_version_major = 0 -_version_minor = 7 -_version_micro = 0 -_version_extra = 'dev' -__version__ = "%s.%s.%s%s" % (_version_major, - _version_minor, - _version_micro, - _version_extra) - -CLASSIFIERS = ["Development Status :: 3 - Alpha", - "Environment :: Console", - "Intended Audience :: Science/Research", - "License :: OSI Approved :: MIT License", - "Operating System :: OS Independent", - "Programming Language :: Python", - "Topic :: Scientific/Engineering"] - -description = 'Stack DICOM images into volumes and convert to Nifti' - -# Hard dependencies -install_requires = ['pydicom >= 0.9.7', - 'nibabel >= 2.0.0', - ] -# Add version specific dependencies -if sys.version_info < (2, 6): - raise Exception("must use python 2.6 or greater") -elif sys.version_info < (2, 7): - install_requires.append('ordereddict') - -# Extra requirements for building documentation and testing -extras_requires = {'doc': ["sphinx", "numpydoc"], - 'test': ["nose"], - } - - -NAME = 'dcmstack' -AUTHOR = "Brendan Moloney" -AUTHOR_EMAIL = "moloney@ohsu.edu" -MAINTAINER = "Brendan Moloney" -MAINTAINER_EMAIL = "moloney@ohsu.edu" -DESCRIPTION = description -LICENSE = "MIT license" -CLASSIFIERS = CLASSIFIERS -PLATFORMS = "OS Independent" -ISRELEASE = _version_extra == '' -VERSION = __version__ -INSTALL_REQUIRES = install_requires -EXTRAS_REQUIRES = extras_requires -PROVIDES = ["dcmstack"] \ No newline at end of file diff --git a/fsl/data/dcmstack/nitool_cli.py b/fsl/data/dcmstack/nitool_cli.py deleted file mode 100644 index 5e83da5192da62e3521819f4eadfa8b461e256cd..0000000000000000000000000000000000000000 --- a/fsl/data/dcmstack/nitool_cli.py +++ /dev/null @@ -1,251 +0,0 @@ -""" -Command line interface for nitool. - -@author: moloney -""" -from __future__ import print_function - -import os, sys, argparse -import nibabel as nb -from .dcmmeta import NiftiWrapper, DcmMetaExtension, MissingExtensionError - -prog_descrip = """Work with extended Nifti files created by dcmstack""" - -def main(argv=sys.argv): - #Setup the top level parser - arg_parser = argparse.ArgumentParser(description=prog_descrip) - sub_parsers = arg_parser.add_subparsers(title="Subcommands") - - #Split command - split_help = ("Split src_nii file along a dimension. Defaults to the slice " - "dimension if 3D, otherwise the last dimension.") - split_parser = sub_parsers.add_parser('split', help=split_help) - split_parser.add_argument('src_nii', nargs=1) - split_parser.add_argument('-d', '--dimension', default=None, type=int, - help=("The dimension to split along. Must be in " - "the range [0, 5)")) - split_parser.add_argument('-o', '--output-format', default=None, - help=("Format string used to create the output " - "file names. Default is to prepend the index " - "number to the src_nii filename.")) - split_parser.set_defaults(func=split) - - #Merge Command - merge_help = ("Merge the provided Nifti files along a dimension. Defaults " - "to slice, then time, and then vector.") - merge_parser = sub_parsers.add_parser('merge', help=merge_help) - merge_parser.add_argument('output', nargs=1) - merge_parser.add_argument('src_niis', nargs='+') - merge_parser.add_argument('-d', '--dimension', default=None, type=int, - help=("The dimension to merge along. Must be " - "in the range [0, 5)")) - merge_parser.add_argument('-s', '--sort', default=None, - help=("Sort the source files using the provided " - "meta data key before merging")) - merge_parser.add_argument('-c', '--clear-slices', action='store_true', - help="Clear all per slice meta data") - merge_parser.set_defaults(func=merge) - - #Dump Command - dump_help = "Dump the JSON meta data extension from the provided Nifti." - dump_parser = sub_parsers.add_parser('dump', help=dump_help) - dump_parser.add_argument('src_nii', nargs=1) - dump_parser.add_argument('dest_json', nargs='?', - type=argparse.FileType('w'), - default=sys.stdout) - dump_parser.add_argument('-m', '--make-empty', default=False, - action='store_true', - help="Make an empty extension if none exists") - dump_parser.add_argument('-r', '--remove', default=False, - action='store_true', - help="Remove the extension from the Nifti file") - dump_parser.set_defaults(func=dump) - - #Embed Command - embed_help = "Embed a JSON extension into the Nifti file." - embed_parser = sub_parsers.add_parser('embed', help=embed_help) - embed_parser.add_argument('src_json', nargs='?', type=argparse.FileType('r'), - default=sys.stdin) - embed_parser.add_argument('dest_nii', nargs=1) - embed_parser.add_argument('-f', '--force-overwrite', action='store_true', - help="Overwrite any existing dcmmeta extension") - embed_parser.set_defaults(func=embed) - - #Lookup command - lookup_help = "Lookup the value for the given meta data key." - lookup_parser = sub_parsers.add_parser('lookup', help=lookup_help) - lookup_parser.add_argument('key', nargs=1) - lookup_parser.add_argument('src_nii', nargs=1) - lookup_parser.add_argument('-i', '--index', - help=("Use the given voxel index. The index " - "must be provided as a comma separated list of " - "integers (one for each dimension).")) - lookup_parser.set_defaults(func=lookup) - - #Inject command - inject_help = "Inject meta data into the JSON extension." - inject_parser = sub_parsers.add_parser('inject', help=inject_help) - inject_parser.add_argument('dest_nii', nargs=1) - inject_parser.add_argument('classification', nargs=2) - inject_parser.add_argument('key', nargs=1) - inject_parser.add_argument('values', nargs='+') - inject_parser.add_argument('-f', '--force-overwrite', - action='store_true', - help=("Overwrite any existing values " - "for the key")) - inject_parser.add_argument('-t', '--type', default=None, - help="Interpret the value as this type instead " - "of trying to determine the type automatically") - inject_parser.set_defaults(func=inject) - - #Parse the arguments and call the appropriate function - args = arg_parser.parse_args(argv[1:]) - return args.func(args) - -def split(args): - src_path = args.src_nii[0] - src_fn = os.path.basename(src_path) - src_dir = os.path.dirname(src_path) - - src_nii = nb.load(src_path) - try: - src_wrp = NiftiWrapper(src_nii) - except MissingExtensionError: - print("No dcmmeta extension found, making empty one...") - src_wrp = NiftiWrapper(src_nii, make_empty=True) - for split_idx, split in enumerate(src_wrp.split(args.dimension)): - if args.output_format: - out_name = (args.output_format % - split.meta_ext.get_class_dict(('global', 'const')) - ) - else: - out_name = os.path.join(src_dir, '%03d-%s' % (split_idx, src_fn)) - nb.save(split, out_name) - return 0 - -def make_key_func(meta_key, index=None): - def key_func(src_nii): - result = src_nii.get_meta(meta_key, index) - if result is None: - raise ValueError('Key not found: %s' ) % meta_key - return result - - return key_func - -def merge(args): - src_wrps = [] - for src_path in args.src_niis: - src_nii = nb.load(src_path) - try: - src_wrp = NiftiWrapper(src_nii) - except MissingExtensionError: - print("No dcmmeta extension found, making empty one...") - src_wrp = NiftiWrapper(src_nii, make_empty=True) - src_wrps.append(src_wrp) - - if args.sort: - src_wrps.sort(key=make_key_func(args.sort)) - - result_wrp = NiftiWrapper.from_sequence(src_wrps, args.dimension) - - if args.clear_slices: - result_wrp.meta_ext.clear_slice_meta() - - out_name = (args.output[0] % - result_wrp.meta_ext.get_class_dict(('global', 'const'))) - result_wrp.to_filename(out_name) - return 0 - -def dump(args): - src_nii = nb.load(args.src_nii[0]) - src_wrp = NiftiWrapper(src_nii, args.make_empty) - meta_str = src_wrp.meta_ext.to_json() - args.dest_json.write(meta_str) - args.dest_json.write('\n') - - if args.remove: - src_wrp.remove_extension() - src_wrp.to_filename(args.src_nii[0]) - return 0 - -def check_overwrite(): - usr_input = '' - while not usr_input in ('y', 'n'): - usr_input = input('Existing DcmMeta extension found, overwrite? ' - '[y/n]').lower() - return usr_input == 'y' - -def embed(args): - dest_nii = nb.load(args.dest_nii[0]) - hdr = dest_nii.get_header() - try: - src_wrp = NiftiWrapper(dest_nii, False) - except MissingExtensionError: - pass - else: - if not args.force_overwrite: - if not check_overwrite(): - return - src_wrp.remove_extension() - - hdr.extensions.append(DcmMetaExtension.from_json(args.src_json.read())) - nb.save(dest_nii, args.dest_nii[0]) - return 0 - -def lookup(args): - src_wrp = NiftiWrapper.from_filename(args.src_nii[0]) - index = None - if args.index: - index = tuple(int(idx.strip()) for idx in args.index.split(',')) - meta = src_wrp.get_meta(args.key[0], index) - if not meta is None: - print(meta) - return 0 - -def convert_values(values, type_str=None): - if type_str is None: - for conv_type in (int, float): - try: - values = [conv_type(val) for val in values] - except ValueError: - pass - else: - break - else: - if type_str not in ('str', 'int', 'float'): - raise ValueError("Unrecognized type: %s" % type_str) - conv_type = eval(type_str) - values = [conv_type(val) for val in values] - if len(values) == 1: - return values[0] - return values - -def inject(args): - dest_nii = nb.load(args.dest_nii[0]) - dest_wrp = NiftiWrapper(dest_nii, make_empty=True) - classification = tuple(args.classification) - if not classification in dest_wrp.meta_ext.get_valid_classes(): - print("Invalid classification: %s" % (classification,)) - return 1 - n_vals = len(args.values) - mult = dest_wrp.meta_ext.get_multiplicity(classification) - if n_vals != mult: - print(("Invalid number of values for classification. Expected " - "%d but got %d") % (mult, n_vals)) - return 1 - key = args.key[0] - if key in dest_wrp.meta_ext.get_keys(): - if not args.force_overwrite: - print("Key already exists, must pass --force-overwrite") - return 1 - else: - curr_class = dest_wrp.meta_ext.get_classification(key) - curr_dict = dest_wrp.meta_ext.get_class_dict(curr_class) - del curr_dict[key] - class_dict = dest_wrp.meta_ext.get_class_dict(classification) - class_dict[key] = convert_values(args.values, args.type) - nb.save(dest_nii, args.dest_nii[0]) - return 0 - -if __name__ == '__main__': - sys.exit(main())