Skip to content
Snippets Groups Projects
Commit ece0bd63 authored by Paul McCarthy's avatar Paul McCarthy :mountain_bicyclist:
Browse files

Including a copy of dcmstack, which will be removed when it is brought up to

date.
parent 8dc1a740
No related branches found
No related tags found
No related merge requests found
**********************
Copyright and Licenses
**********************
dcmstack
=======
The dcmstack package, including all examples, code snippets and attached
documentation is covered by the MIT license.
::
The MIT License
Copyright (c) 2011-2012 Brendan Moloney <moloney@ohsu.edu>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
Package for stacking DICOM images into multi dimensional volumes, extracting
the DICOM meta data, converting the result to Nifti files with the meta
data stored in a header extension, and work with these extended Nifti files.
"""
from .info import __version__
from .dcmstack import *
This diff is collapsed.
This diff is collapsed.
"""
Command line interface to dcmstack.
@author: moloney
"""
from __future__ import print_function
import os, sys, argparse, string
from glob import glob
import dicom
from . import dcmstack
from .dcmstack import (parse_and_group, stack_group, DicomOrdering,
default_group_keys)
from .dcmmeta import NiftiWrapper
from . import extract
from .info import __version__
prog_descrip = """Stack DICOM files from each source directory into 2D to 5D
volumes, optionally extracting meta data.
"""
prog_epilog = """IT IS YOUR RESPONSIBILITY TO KNOW IF THERE IS PRIVATE HEALTH
INFORMATION IN THE METADATA EXTRACTED BY THIS PROGRAM."""
def parse_tags(opt_str):
tag_strs = opt_str.split(',')
tags = []
for tag_str in tag_strs:
tokens = tag_str.split('_')
if len(tokens) != 2:
raise ValueError('Invalid str format for tags')
tags.append(dicom.tag.Tag(int(tokens[0].strip(), 16),
int(tokens[1].strip(), 16))
)
return tags
def sanitize_path_comp(path_comp):
result = []
for char in path_comp:
if not char in string.letters + string.digits + '-_.':
result.append('_')
else:
result.append(char)
return ''.join(result)
def main(argv=sys.argv):
#Handle command line options
arg_parser = argparse.ArgumentParser(description=prog_descrip,
epilog=prog_epilog)
arg_parser.add_argument('src_dirs', nargs='*', help=('The source '
'directories containing DICOM files.'))
input_opt = arg_parser.add_argument_group('Input options')
input_opt.add_argument('--force-read', action='store_true', default=False,
help=('Try reading all files as DICOM, even if they '
'are missing the preamble.'))
input_opt.add_argument('--file-ext', default='.dcm', help=('Only try reading '
'files with the given extension. Default: '
'%(default)s'))
input_opt.add_argument('--allow-dummies', action='store_true', default=False,
help=('Allow DICOM files that are missing pixel '
'data, filling that slice of the output nifti with '
'the maximum representable value.'))
output_opt = arg_parser.add_argument_group('Output options')
output_opt.add_argument('--dest-dir', default=None,
help=('Destination directory, defaults to the '
'source directory.'))
output_opt.add_argument('-o', '--output-name', default=None,
help=('Python format string determining the output '
'filenames based on DICOM tags.'))
output_opt.add_argument('--output-ext', default='.nii.gz',
help=('The extension for the output file type. '
'Default: %(default)s'))
output_opt.add_argument('-d', '--dump-meta', default=False,
action='store_true', help=('Dump the extracted '
'meta data into a JSON file with the same base '
'name as the generated Nifti'))
output_opt.add_argument('--embed-meta', default=False, action='store_true',
help=('Embed the extracted meta data into a Nifti '
'header extension (in JSON format).'))
stack_opt = arg_parser.add_argument_group('Stacking Options')
stack_opt.add_argument('-g', '--group-by', default=None,
help=("Comma separated list of meta data keys to "
"group input files into stacks with."))
stack_opt.add_argument('--voxel-order', default='LAS',
help=('Order the voxels so the spatial indices '
'start from these directions in patient space. '
'The directions in patient space should be given '
'as a three character code: (l)eft, (r)ight, '
'(a)nterior, (p)osterior, (s)uperior, (i)nferior. '
'Passing an empty string will disable '
'reorientation. '
'Default: %(default)s'))
stack_opt.add_argument('-t', '--time-var', default=None,
help=('The DICOM element keyword to use for '
'ordering the stack along the time dimension.'))
stack_opt.add_argument('--vector-var', default=None,
help=('The DICOM element keyword to use for '
'ordering the stack along the vector dimension.'))
stack_opt.add_argument('--time-order', default=None,
help=('Provide a text file with the desired order '
'for the values (one per line) of the attribute '
'used as the time variable. This option is rarely '
'needed.'))
stack_opt.add_argument('--vector-order', default=None,
help=('Provide a text file with the desired order '
'for the values (one per line) of the attribute '
'used as the vector variable. This option is rarely '
'needed.'))
meta_opt = arg_parser.add_argument_group('Meta Extraction and Filtering '
'Options')
meta_opt.add_argument('-l', '--list-translators', default=False,
action='store_true', help=('List enabled translators '
'and exit'))
meta_opt.add_argument('--disable-translator', default=None,
help=('Disable the translators for the provided '
'tags. Tags should be given in the format '
'"0x0_0x0". More than one can be given in a comma '
'separated list. If the word "all" is provided, all '
'translators will be disabled.'))
meta_opt.add_argument('--extract-private', default=False,
action='store_true',
help=('Extract meta data from private elements, even '
'if there is no translator. If the value for the '
'element contains non-ascii bytes it will still be '
'ignored. The extracted meta data may still be '
'filtered out by the regular expressions.'))
meta_opt.add_argument('-i', '--include-regex', action='append',
help=('Include any meta data where the key matches '
'the provided regular expression. This will override '
'any exclude expressions. Applies to all meta data.'))
meta_opt.add_argument('-e', '--exclude-regex', action='append',
help=('Exclude any meta data where the key matches '
'the provided regular expression. This will '
'supplement the default exclude expressions. Applies '
'to all meta data.'))
meta_opt.add_argument('--default-regexes', default=False,
action='store_true',
help=('Print the list of default include and exclude '
'regular expressions and exit.'))
gen_opt = arg_parser.add_argument_group('General Options')
gen_opt.add_argument('-v', '--verbose', default=False, action='store_true',
help=('Print additional information.'))
gen_opt.add_argument('--strict', default=False, action='store_true',
help=('Fail on the first exception instead of '
'showing a warning.'))
gen_opt.add_argument('--version', default=False, action='store_true',
help=('Show the version and exit.'))
args = arg_parser.parse_args(argv[1:])
if args.version:
print(__version__)
return 0
#Check if we are just listing the translators
if args.list_translators:
for translator in extract.default_translators:
print('%s -> %s' % (translator.tag, translator.name))
return 0
#Check if we are just listing the default exclude regular expressions
if args.default_regexes:
print('Default exclude regular expressions:')
for regex in dcmstack.default_key_excl_res:
print('\t' + regex)
print('Default include regular expressions:')
for regex in dcmstack.default_key_incl_res:
print('\t' + regex)
return 0
#Check if we are generating meta data
gen_meta = args.embed_meta or args.dump_meta
if gen_meta:
#Start with the module defaults
ignore_rules = extract.default_ignore_rules
translators = extract.default_translators
#Disable translators if requested
if args.disable_translator:
if args.disable_translator.lower() == 'all':
translators = tuple()
else:
try:
disable_tags = parse_tags(args.disable_translator)
except:
arg_parser.error('Invalid tag format to --disable-translator.')
new_translators = []
for translator in translators:
if not translator.tag in disable_tags:
new_translators.append(translator)
translators = new_translators
#Include non-translated private elements if requested
if args.extract_private:
ignore_rules = (extract.ignore_pixel_data,
extract.ignore_overlay_data,
extract.ignore_color_lut_data)
extractor = extract.MetaExtractor(ignore_rules, translators)
else:
extractor = extract.minimal_extractor
#Add include/exclude regexes to meta filter
include_regexes = dcmstack.default_key_incl_res
if args.include_regex:
include_regexes += args.include_regex
exclude_regexes = dcmstack.default_key_excl_res
if args.exclude_regex:
exclude_regexes += args.exclude_regex
meta_filter = dcmstack.make_key_regex_filter(exclude_regexes,
include_regexes)
#Figure out time and vector ordering
if args.time_var:
if args.time_order:
order_file = open(args.time_order)
abs_order = [line.strip() for line in order_file.readlines()]
order_file.close()
time_order = DicomOrdering(args.time_var, abs_order, True)
else:
time_order = DicomOrdering(args.time_var)
else:
time_order = None
if args.vector_var:
if args.vector_order:
order_file = open(args.vector_order)
abs_order = [line.strip() for line in order_file.readlines()]
order_file.close()
vector_order = DicomOrdering(args.vector_var, abs_order, True)
else:
vector_order = DicomOrdering(args.vector_var)
else:
vector_order = None
if len(args.src_dirs) == 0:
arg_parser.error('No source directories were provided.')
#Handle group-by option
if not args.group_by is None:
group_by = args.group_by.split(',')
else:
group_by = default_group_keys
#Handle each source directory individually
for src_dir in args.src_dirs:
if not os.path.isdir(src_dir):
print('%s is not a directory, skipping' % src_dir, file=sys.stderr)
if args.verbose:
print("Processing source directory %s" % src_dir)
#Build a list of paths to source files
glob_str = os.path.join(src_dir, '*')
if args.file_ext:
glob_str += args.file_ext
src_paths = glob(glob_str)
if args.verbose:
print("Found %d source files in the directory" % len(src_paths))
#Group the files in this directory
groups = parse_and_group(src_paths,
group_by,
extractor,
args.force_read,
not args.strict,
)
if args.verbose:
print("Found %d groups of DICOM images" % len(groups))
if len(groups) == 0:
print("No DICOM files found in %s" % src_dir)
out_idx = 0
generated_outs = set()
for key, group in groups.iteritems():
stack = stack_group(group,
warn_on_except=not args.strict,
time_order=time_order,
vector_order=vector_order,
allow_dummies=args.allow_dummies,
meta_filter=meta_filter)
meta = group[0][1]
#Build an appropriate output format string if none was specified
if args.output_name is None:
out_fmt = []
if 'SeriesNumber' in meta:
out_fmt.append('%(SeriesNumber)03d')
if 'ProtocolName' in meta:
out_fmt.append('%(ProtocolName)s')
elif 'SeriesDescription' in meta:
out_fmt.append('%(SeriesDescription)s')
else:
out_fmt.append('series')
out_fmt = '-'.join(out_fmt)
else:
out_fmt = args.output_name
#Get the output filename from the format string, make sure the
#result is unique for this source directory
out_fn = sanitize_path_comp(out_fmt % meta)
if out_fn in generated_outs:
out_fn += '-%03d' % out_idx
generated_outs.add(out_fn)
out_idx += 1
out_fn = out_fn + args.output_ext
if args.dest_dir:
out_path = os.path.join(args.dest_dir, out_fn)
else:
out_path = os.path.join(src_dir, out_fn)
if args.verbose:
print("Writing out stack to path %s" % out_path)
nii = stack.to_nifti(args.voxel_order, gen_meta)
if args.dump_meta:
nii_wrp = NiftiWrapper(nii)
path_tokens = out_path.split('.')
if path_tokens[-1] == 'gz':
path_tokens = path_tokens[:-1]
if path_tokens[-1] == 'nii':
path_tokens = path_tokens[:-1]
meta_path = '.'.join(path_tokens + ['json'])
out_file = open(meta_path, 'w')
out_file.write(nii_wrp.meta_ext.to_json())
out_file.close()
if not args.embed_meta:
nii_wrp.remove_extension()
del nii_wrp
nii.to_filename(out_path)
del key
del group
del stack
del meta
del nii
del groups
return 0
if __name__ == '__main__':
sys.exit(main())
"""
Extract meta data from a DICOM data set.
"""
import struct, warnings
from collections import namedtuple, defaultdict
import dicom
from dicom.datadict import keyword_for_tag
from nibabel.nicom import csareader
from .dcmstack import DicomStack
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
try:
import chardet
have_chardet = True
except ImportError:
have_chardet = False
pass
#This is needed to allow extraction on files with invalid values (e.g. too
#long of a decimal string)
dicom.config.enforce_valid_values = False
# Python 2 / 3 compatibility
unicode_str = unicode if sys.version_info[0] < 3 else str
def is_ascii(in_str):
'''Return true if the given string is valid ASCII.'''
if all(' ' <= c <= '~' for c in in_str):
return True
return False
def ignore_private(elem):
'''Ignore rule for `MetaExtractor` to skip private DICOM elements (odd
group number).'''
if elem.tag.group % 2 == 1:
return True
return False
def ignore_pixel_data(elem):
return elem.tag == dicom.tag.Tag(0x7fe0, 0x10)
def ignore_overlay_data(elem):
return elem.tag.group & 0xff00 == 0x6000 and elem.tag.elem == 0x3000
def ignore_color_lut_data(elem):
return (elem.tag.group == 0x28 and
elem.tag.elem in (0x1201, 0x1202, 0x1203, 0x1221, 0x1222, 0x1223))
default_ignore_rules = (ignore_private,
ignore_pixel_data,
ignore_overlay_data,
ignore_color_lut_data)
'''The default tuple of ignore rules for `MetaExtractor`.'''
Translator = namedtuple('Translator', ['name',
'tag',
'priv_creator',
'trans_func']
)
'''A namedtuple for storing the four elements of a translator: a name, the
dicom.tag.Tag that can be translated, the private creator string (optional), and
the function which takes the DICOM element and returns a dictionary.'''
def simplify_csa_dict(csa_dict):
'''Simplify the result of nibabel.nicom.csareader.
Parameters
----------
csa_dict : dict
The result from nibabel.nicom.csareader
Returns
-------
result : OrderedDict
Result where the keys come from the 'tags' sub dictionary of `csa_dict`.
The values come from the 'items' within that tags sub sub dictionary.
If items has only one element it will be unpacked from the list.
'''
if csa_dict is None:
return None
result = OrderedDict()
for tag in csa_dict['tags']:
items = csa_dict['tags'][tag]['items']
if len(items) == 0:
continue
elif len(items) == 1:
result[tag] = items[0]
else:
result[tag] = items
return result
def csa_image_trans_func(elem):
'''Function for translating the CSA image sub header.'''
return simplify_csa_dict(csareader.read(elem.value))
csa_image_trans = Translator('CsaImage',
dicom.tag.Tag(0x29, 0x1010),
'SIEMENS CSA HEADER',
csa_image_trans_func)
'''Translator for the CSA image sub header.'''
class PhoenixParseError(Exception):
def __init__(self, line):
'''Exception indicating a error parsing a line from the Phoenix
Protocol.
'''
self.line = line
def __str__(self):
return 'Unable to parse phoenix protocol line: %s' % self.line
def _parse_phoenix_line(line, str_delim='""'):
delim_len = len(str_delim)
#Handle most comments (not always when string literal involved)
comment_idx = line.find('#')
if comment_idx != -1:
#Check if the pound sign is in a string literal
if line[:comment_idx].count(str_delim) == 1:
if line[comment_idx:].find(str_delim) == -1:
raise PhoenixParseError(line)
else:
line = line[:comment_idx]
#Allow empty lines
if line.strip() == '':
return None
#Find the first equals sign and use that to split key/value
equals_idx = line.find('=')
if equals_idx == -1:
raise PhoenixParseError(line)
key = line[:equals_idx].strip()
val_str = line[equals_idx + 1:].strip()
#If there is a string literal, pull that out
if val_str.startswith(str_delim):
end_quote = val_str[delim_len:].find(str_delim) + delim_len
if end_quote == -1:
raise PhoenixParseError(line)
elif not end_quote == len(val_str) - delim_len:
#Make sure remainder is just comment
if not val_str[end_quote+delim_len:].strip().startswith('#'):
raise PhoenixParseError(line)
return (key, val_str[2:end_quote])
else: #Otherwise try to convert to an int or float
val = None
try:
val = int(val_str)
except ValueError:
pass
else:
return (key, val)
try:
val = int(val_str, 16)
except ValueError:
pass
else:
return (key, val)
try:
val = float(val_str)
except ValueError:
pass
else:
return (key, val)
raise PhoenixParseError(line)
def parse_phoenix_prot(prot_key, prot_val):
'''Parse the MrPheonixProtocol string.
Parameters
----------
prot_str : str
The 'MrPheonixProtocol' string from the CSA Series sub header.
Returns
-------
prot_dict : OrderedDict
Meta data pulled from the ASCCONV section.
Raises
------
PhoenixParseError : A line of the ASCCONV section could not be parsed.
'''
if prot_key == 'MrPhoenixProtocol':
str_delim = '""'
elif prot_key == 'MrProtocol':
str_delim = '"'
else:
raise ValueError('Unknown protocol key: %s' % prot_key)
ascconv_start = prot_val.find('### ASCCONV BEGIN ')
ascconv_end = prot_val.find('### ASCCONV END ###')
ascconv = prot_val[ascconv_start:ascconv_end].split('\n')[1:-1]
result = OrderedDict()
for line in ascconv:
parse_result = _parse_phoenix_line(line, str_delim)
if parse_result:
result[parse_result[0]] = parse_result[1]
return result
def csa_series_trans_func(elem):
'''Function for parsing the CSA series sub header.'''
csa_dict = simplify_csa_dict(csareader.read(elem.value))
#If there is a phoenix protocol, parse it and dump it into the csa_dict
phx_src = None
if 'MrPhoenixProtocol' in csa_dict:
phx_src = 'MrPhoenixProtocol'
elif 'MrProtocol' in csa_dict:
phx_src = 'MrProtocol'
if not phx_src is None:
phoenix_dict = parse_phoenix_prot(phx_src, csa_dict[phx_src])
del csa_dict[phx_src]
for key, val in phoenix_dict.items():
new_key = '%s.%s' % ('MrPhoenixProtocol', key)
csa_dict[new_key] = val
return csa_dict
csa_series_trans = Translator('CsaSeries',
dicom.tag.Tag(0x29, 0x1020),
'SIEMENS CSA HEADER',
csa_series_trans_func)
'''Translator for parsing the CSA series sub header.'''
default_translators = (csa_image_trans,
csa_series_trans,
)
'''Default translators for MetaExtractor.'''
def tag_to_str(tag):
'''Convert a DICOM tag to a string representation using the group and
element hex values seprated by an underscore.'''
return '%#X_%#X' % (tag.group, tag.elem)
unpack_vr_map = {'SL' : 'i',
'UL' : 'I',
'FL' : 'f',
'FD' : 'd',
'SS' : 'h',
'US' : 'H',
'US or SS' : 'H',
}
'''Dictionary mapping value representations to corresponding format strings for
the struct.unpack function.'''
def tm_to_seconds(time_str):
'''Convert a DICOM time value (value representation of 'TM') to the number
of seconds past midnight.
Parameters
----------
time_str : str
The DICOM time value string
Returns
-------
A floating point representing the number of seconds past midnight
'''
#Allow ACR/NEMA style format by removing any colon chars
time_str = time_str.replace(':', '')
#Only the hours portion is required
result = int(time_str[:2]) * 3600
str_len = len(time_str)
if str_len > 2:
result += int(time_str[2:4]) * 60
if str_len > 4:
result += float(time_str[4:])
return float(result)
def get_text(byte_str):
'''If the given byte string contains text data return it as unicode,
otherwise return None.
If the 'chardet' package is installed, this will be used to detect the
text encoding. Otherwise the input will only be decoded if it is ASCII.
'''
if have_chardet:
match = chardet.detect(byte_str)
if match['encoding'] is None:
return None
else:
return byte_str.decode(match['encoding'])
else:
if not is_ascii(byte_str):
return None
else:
return byte_str.decode('ascii')
default_conversions = {'DS' : float,
'IS' : int,
'AT' : str,
'OW' : get_text,
'OB' : get_text,
'OW or OB' : get_text,
'OB or OW' : get_text,
'UN' : get_text,
'PN' : unicode_str,
'UI' : unicode_str,
}
class MetaExtractor(object):
'''Callable object for extracting meta data from a dicom dataset.
Initialize with a set of ignore rules, translators, and type
conversions.
Parameters
----------
ignore_rules : sequence
A sequence of callables, each of which should take a DICOM element
and return True if it should be ignored. If None the module
default is used.
translators : sequence
A sequence of `Translator` objects each of which can convert a
DICOM element into a dictionary. Overrides any ignore rules. If
None the module default is used.
conversions : dict
Mapping of DICOM value representation (VR) strings to callables
that perform some conversion on the value
warn_on_trans_except : bool
Convert any exceptions from translators into warnings.
'''
def __init__(self, ignore_rules=None, translators=None, conversions=None,
warn_on_trans_except=True):
if ignore_rules is None:
self.ignore_rules = default_ignore_rules
else:
self.ignore_rules = ignore_rules
if translators is None:
self.translators = default_translators
else:
self.translators = translators
if conversions is None:
self.conversions = default_conversions
else:
self.conversions = conversions
self.warn_on_trans_except = warn_on_trans_except
def _get_elem_key(self, elem):
'''Get the key for any non-translated elements.'''
#Use standard DICOM keywords if possible
key = keyword_for_tag(elem.tag)
#For private tags we take elem.name and convert to camel case
if key == '':
key = elem.name
if key.startswith('[') and key.endswith(']'):
key = key[1:-1]
tokens = [token[0].upper() + token[1:]
for token in key.split()]
key = ''.join(tokens)
return key
def _get_elem_value(self, elem):
'''Get the value for any non-translated elements'''
#If the VR is implicit, we may need to unpack the values from a byte
#string. This may require us to make an assumption about whether the
#value is signed or not, but this is unavoidable.
if elem.VR in unpack_vr_map and isinstance(elem.value, str):
n_vals = len(elem.value)/struct.calcsize(unpack_vr_map[elem.VR])
if n_vals != elem.VM:
warnings.warn("The element's VM and the number of values do "
"not match.")
if n_vals == 1:
value = struct.unpack(unpack_vr_map[elem.VR], elem.value)[0]
else:
value = list(struct.unpack(unpack_vr_map[elem.VR]*n_vals,
elem.value)
)
else:
#Otherwise, just take a copy if the value is a list
n_vals = elem.VM
if n_vals > 1:
value = elem.value[:]
else:
value = elem.value
#Handle any conversions
if elem.VR in self.conversions:
if n_vals == 1:
value = self.conversions[elem.VR](value)
else:
value = [self.conversions[elem.VR](val) for val in value]
return value
def __call__(self, dcm):
'''Extract the meta data from a DICOM dataset.
Parameters
----------
dcm : dicom.dataset.Dataset
The DICOM dataset to extract the meta data from.
Returns
-------
meta : dict
A dictionary of extracted meta data.
Notes
-----
Non-private tags use the DICOM keywords as keys. Translators have their
name, followed by a dot, prepended to the keys of any meta elements
they produce. Values are unchanged, except when the value
representation is 'DS' or 'IS' (decimal/integer strings) they are
converted to float and int types.
'''
standard_meta = []
trans_meta_dicts = OrderedDict()
#Make dict to track which tags map to which translators
trans_map = {}
# Convert text elements to unicode
dcm.decode()
for elem in dcm:
if isinstance(elem.value, str) and elem.value.strip() == '':
continue
#Get the name for non-translated elements
name = self._get_elem_key(elem)
#If it is a private creator element, setup any corresponding
#translators
if elem.name == "Private Creator":
for translator in self.translators:
if translator.priv_creator == elem.value:
new_elem = ((translator.tag.elem & 0xff) |
(elem.tag.elem * 16**2))
new_tag = dicom.tag.Tag(elem.tag.group, new_elem)
if new_tag in trans_map:
raise ValueError('More than one translator '
'for tag: %s' % new_tag)
trans_map[new_tag] = translator
#If there is a translator for this element, use it
if elem.tag in trans_map:
try:
meta = trans_map[elem.tag].trans_func(elem)
except Exception as e:
if self.warn_on_trans_except:
warnings.warn("Exception from translator %s: %s" %
(trans_map[elem.tag].name,
repr(str(e))))
else:
raise
else:
if meta:
trans_meta_dicts[trans_map[elem.tag].name] = meta
#Otherwise see if we are supposed to ignore the element
elif any(rule(elem) for rule in self.ignore_rules):
continue
#Handle elements that are sequences with recursion
elif isinstance(elem.value, dicom.sequence.Sequence):
value = []
for val in elem.value:
value.append(self(val))
if all(x is None for x in value):
continue
standard_meta.append((name, value, elem.tag))
#Otherwise just make sure the value is unpacked
else:
value = self._get_elem_value(elem)
if value is None:
continue
standard_meta.append((name, value, elem.tag))
#Handle name collisions
name_counts = defaultdict(int)
for elem in standard_meta:
name_counts[elem[0]] += 1
result = OrderedDict()
for name, value, tag in standard_meta:
if name_counts[name] > 1:
name = name + '_' + tag_to_str(tag)
result[name] = value
#Inject translator results
for trans_name, meta in trans_meta_dicts.items():
for name, value in meta.items():
name = '%s.%s' % (trans_name, name)
result[name] = value
return result
def minimal_extractor(dcm):
'''Meta data extractor that just extracts the minimal set of keys needed
by DicomStack objects.
'''
result = {}
for key in DicomStack.minimal_keys:
try:
result[key] = dcm.__getattr__(key)
except AttributeError:
pass
return result
default_extractor = MetaExtractor()
'''The default `MetaExtractor`.'''
""" Information for setup.py that we may also want to access in dcmstack. Can
not import dcmstack.
"""
import sys
_version_major = 0
_version_minor = 7
_version_micro = 0
_version_extra = 'dev'
__version__ = "%s.%s.%s%s" % (_version_major,
_version_minor,
_version_micro,
_version_extra)
CLASSIFIERS = ["Development Status :: 3 - Alpha",
"Environment :: Console",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Topic :: Scientific/Engineering"]
description = 'Stack DICOM images into volumes and convert to Nifti'
# Hard dependencies
install_requires = ['pydicom >= 0.9.7',
'nibabel >= 2.0.0',
]
# Add version specific dependencies
if sys.version_info < (2, 6):
raise Exception("must use python 2.6 or greater")
elif sys.version_info < (2, 7):
install_requires.append('ordereddict')
# Extra requirements for building documentation and testing
extras_requires = {'doc': ["sphinx", "numpydoc"],
'test': ["nose"],
}
NAME = 'dcmstack'
AUTHOR = "Brendan Moloney"
AUTHOR_EMAIL = "moloney@ohsu.edu"
MAINTAINER = "Brendan Moloney"
MAINTAINER_EMAIL = "moloney@ohsu.edu"
DESCRIPTION = description
LICENSE = "MIT license"
CLASSIFIERS = CLASSIFIERS
PLATFORMS = "OS Independent"
ISRELEASE = _version_extra == ''
VERSION = __version__
INSTALL_REQUIRES = install_requires
EXTRAS_REQUIRES = extras_requires
PROVIDES = ["dcmstack"]
\ No newline at end of file
"""
Command line interface for nitool.
@author: moloney
"""
from __future__ import print_function
import os, sys, argparse
import nibabel as nb
from .dcmmeta import NiftiWrapper, DcmMetaExtension, MissingExtensionError
prog_descrip = """Work with extended Nifti files created by dcmstack"""
def main(argv=sys.argv):
#Setup the top level parser
arg_parser = argparse.ArgumentParser(description=prog_descrip)
sub_parsers = arg_parser.add_subparsers(title="Subcommands")
#Split command
split_help = ("Split src_nii file along a dimension. Defaults to the slice "
"dimension if 3D, otherwise the last dimension.")
split_parser = sub_parsers.add_parser('split', help=split_help)
split_parser.add_argument('src_nii', nargs=1)
split_parser.add_argument('-d', '--dimension', default=None, type=int,
help=("The dimension to split along. Must be in "
"the range [0, 5)"))
split_parser.add_argument('-o', '--output-format', default=None,
help=("Format string used to create the output "
"file names. Default is to prepend the index "
"number to the src_nii filename."))
split_parser.set_defaults(func=split)
#Merge Command
merge_help = ("Merge the provided Nifti files along a dimension. Defaults "
"to slice, then time, and then vector.")
merge_parser = sub_parsers.add_parser('merge', help=merge_help)
merge_parser.add_argument('output', nargs=1)
merge_parser.add_argument('src_niis', nargs='+')
merge_parser.add_argument('-d', '--dimension', default=None, type=int,
help=("The dimension to merge along. Must be "
"in the range [0, 5)"))
merge_parser.add_argument('-s', '--sort', default=None,
help=("Sort the source files using the provided "
"meta data key before merging"))
merge_parser.add_argument('-c', '--clear-slices', action='store_true',
help="Clear all per slice meta data")
merge_parser.set_defaults(func=merge)
#Dump Command
dump_help = "Dump the JSON meta data extension from the provided Nifti."
dump_parser = sub_parsers.add_parser('dump', help=dump_help)
dump_parser.add_argument('src_nii', nargs=1)
dump_parser.add_argument('dest_json', nargs='?',
type=argparse.FileType('w'),
default=sys.stdout)
dump_parser.add_argument('-m', '--make-empty', default=False,
action='store_true',
help="Make an empty extension if none exists")
dump_parser.add_argument('-r', '--remove', default=False,
action='store_true',
help="Remove the extension from the Nifti file")
dump_parser.set_defaults(func=dump)
#Embed Command
embed_help = "Embed a JSON extension into the Nifti file."
embed_parser = sub_parsers.add_parser('embed', help=embed_help)
embed_parser.add_argument('src_json', nargs='?', type=argparse.FileType('r'),
default=sys.stdin)
embed_parser.add_argument('dest_nii', nargs=1)
embed_parser.add_argument('-f', '--force-overwrite', action='store_true',
help="Overwrite any existing dcmmeta extension")
embed_parser.set_defaults(func=embed)
#Lookup command
lookup_help = "Lookup the value for the given meta data key."
lookup_parser = sub_parsers.add_parser('lookup', help=lookup_help)
lookup_parser.add_argument('key', nargs=1)
lookup_parser.add_argument('src_nii', nargs=1)
lookup_parser.add_argument('-i', '--index',
help=("Use the given voxel index. The index "
"must be provided as a comma separated list of "
"integers (one for each dimension)."))
lookup_parser.set_defaults(func=lookup)
#Inject command
inject_help = "Inject meta data into the JSON extension."
inject_parser = sub_parsers.add_parser('inject', help=inject_help)
inject_parser.add_argument('dest_nii', nargs=1)
inject_parser.add_argument('classification', nargs=2)
inject_parser.add_argument('key', nargs=1)
inject_parser.add_argument('values', nargs='+')
inject_parser.add_argument('-f', '--force-overwrite',
action='store_true',
help=("Overwrite any existing values "
"for the key"))
inject_parser.add_argument('-t', '--type', default=None,
help="Interpret the value as this type instead "
"of trying to determine the type automatically")
inject_parser.set_defaults(func=inject)
#Parse the arguments and call the appropriate function
args = arg_parser.parse_args(argv[1:])
return args.func(args)
def split(args):
src_path = args.src_nii[0]
src_fn = os.path.basename(src_path)
src_dir = os.path.dirname(src_path)
src_nii = nb.load(src_path)
try:
src_wrp = NiftiWrapper(src_nii)
except MissingExtensionError:
print("No dcmmeta extension found, making empty one...")
src_wrp = NiftiWrapper(src_nii, make_empty=True)
for split_idx, split in enumerate(src_wrp.split(args.dimension)):
if args.output_format:
out_name = (args.output_format %
split.meta_ext.get_class_dict(('global', 'const'))
)
else:
out_name = os.path.join(src_dir, '%03d-%s' % (split_idx, src_fn))
nb.save(split, out_name)
return 0
def make_key_func(meta_key, index=None):
def key_func(src_nii):
result = src_nii.get_meta(meta_key, index)
if result is None:
raise ValueError('Key not found: %s' ) % meta_key
return result
return key_func
def merge(args):
src_wrps = []
for src_path in args.src_niis:
src_nii = nb.load(src_path)
try:
src_wrp = NiftiWrapper(src_nii)
except MissingExtensionError:
print("No dcmmeta extension found, making empty one...")
src_wrp = NiftiWrapper(src_nii, make_empty=True)
src_wrps.append(src_wrp)
if args.sort:
src_wrps.sort(key=make_key_func(args.sort))
result_wrp = NiftiWrapper.from_sequence(src_wrps, args.dimension)
if args.clear_slices:
result_wrp.meta_ext.clear_slice_meta()
out_name = (args.output[0] %
result_wrp.meta_ext.get_class_dict(('global', 'const')))
result_wrp.to_filename(out_name)
return 0
def dump(args):
src_nii = nb.load(args.src_nii[0])
src_wrp = NiftiWrapper(src_nii, args.make_empty)
meta_str = src_wrp.meta_ext.to_json()
args.dest_json.write(meta_str)
args.dest_json.write('\n')
if args.remove:
src_wrp.remove_extension()
src_wrp.to_filename(args.src_nii[0])
return 0
def check_overwrite():
usr_input = ''
while not usr_input in ('y', 'n'):
usr_input = input('Existing DcmMeta extension found, overwrite? '
'[y/n]').lower()
return usr_input == 'y'
def embed(args):
dest_nii = nb.load(args.dest_nii[0])
hdr = dest_nii.get_header()
try:
src_wrp = NiftiWrapper(dest_nii, False)
except MissingExtensionError:
pass
else:
if not args.force_overwrite:
if not check_overwrite():
return
src_wrp.remove_extension()
hdr.extensions.append(DcmMetaExtension.from_json(args.src_json.read()))
nb.save(dest_nii, args.dest_nii[0])
return 0
def lookup(args):
src_wrp = NiftiWrapper.from_filename(args.src_nii[0])
index = None
if args.index:
index = tuple(int(idx.strip()) for idx in args.index.split(','))
meta = src_wrp.get_meta(args.key[0], index)
if not meta is None:
print(meta)
return 0
def convert_values(values, type_str=None):
if type_str is None:
for conv_type in (int, float):
try:
values = [conv_type(val) for val in values]
except ValueError:
pass
else:
break
else:
if type_str not in ('str', 'int', 'float'):
raise ValueError("Unrecognized type: %s" % type_str)
conv_type = eval(type_str)
values = [conv_type(val) for val in values]
if len(values) == 1:
return values[0]
return values
def inject(args):
dest_nii = nb.load(args.dest_nii[0])
dest_wrp = NiftiWrapper(dest_nii, make_empty=True)
classification = tuple(args.classification)
if not classification in dest_wrp.meta_ext.get_valid_classes():
print("Invalid classification: %s" % (classification,))
return 1
n_vals = len(args.values)
mult = dest_wrp.meta_ext.get_multiplicity(classification)
if n_vals != mult:
print(("Invalid number of values for classification. Expected "
"%d but got %d") % (mult, n_vals))
return 1
key = args.key[0]
if key in dest_wrp.meta_ext.get_keys():
if not args.force_overwrite:
print("Key already exists, must pass --force-overwrite")
return 1
else:
curr_class = dest_wrp.meta_ext.get_classification(key)
curr_dict = dest_wrp.meta_ext.get_class_dict(curr_class)
del curr_dict[key]
class_dict = dest_wrp.meta_ext.get_class_dict(classification)
class_dict[key] = convert_values(args.values, args.type)
nb.save(dest_nii, args.dest_nii[0])
return 0
if __name__ == '__main__':
sys.exit(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment