Commit 66786d0d authored by Paul McCarthy's avatar Paul McCarthy
Browse files

Reworked fsl.util.path functions so they don't use a 'replace'

dictionary. Intead, they use 'file groups', which is conceptually much
easier to understand. New functions splitExt and getFileGroup.

Also added imcp and immv functions, which perform the equivalent of the
FSL imcp/immv scripts (but are more general).
parent 1c4fc778
......@@ -751,8 +751,9 @@ EXTENSION_DESCRIPTIONS = ['Compressed NIFTI images',
"""Descriptions for each of the extensions in :data:`ALLOWED_EXTENSIONS`. """
REPLACEMENTS = {'.hdr' : ['.img'], '.hdr.gz' : ['.img.gz']}
"""Suffix replacements used by :func:`addExt` to resolve file path
FILE_GROUPS = [('.img', '.hdr'),
('.img.gz', '.hdr.gz')]
"""File suffix groups used by :func:`addExt` to resolve file path
ambiguities - see :func:`fsl.utils.path.addExt`.
......@@ -809,7 +810,7 @@ def addExt(prefix, mustExist=True):
def loadIndexedImageFile(filename):
......@@ -16,14 +16,21 @@ paths.
import os.path as op
import shutil
class PathError(Exception):
"""``Exception`` class raised by :func:`addExt` and :func:`getExt`. """
"""``Exception`` class raised by the functions defined in this module
when something goes wrong.
......@@ -74,7 +81,7 @@ def addExt(prefix,
"""Adds a file extension to the given file ``prefix``.
If ``mustExist`` is False, and the file does not already have a
......@@ -84,11 +91,11 @@ def addExt(prefix,
If ``mustExist`` is ``True`` (the default), the function checks to see
if any files exist that have the given prefix, and a supported file
extension. A :exc:`ValueError` is raised if:
extension. A :exc:`PathError` is raised if:
- No files exist with the given prefix and a supported extension.
- ``replace`` is ``None``, and more than one file exists with the
- ``fileGroups`` is ``None``, and more than one file exists with the
given prefix, and a supported extension.
Otherwise the full file name is returned.
......@@ -100,33 +107,12 @@ def addExt(prefix,
:arg mustExist: Whether the file must exist or not.
:arg defaultExt: Default file extension to use.
:arg replace: If multiple files exist with the same ``prefix`` and
supported extensions (e.g. ``file.hdr`` and
``file.img``), this dictionary can be used to resolve
ambiguities. It must have the structure::
suffix : [replacement, ...],
If files with ``suffix`` and one of the ``replacement``
suffixes exists, the ``suffix`` file will
be ignored, and replaced with the ``replacement`` file.
If multiple ``replacement`` files exist alongside the
``suffix`` file, a ``PathError`` is raised.
.. note:: The primary use-case of the ``replace`` parameter is to resolve
ambiguity with respect to NIFTI and ANALYSE75 image pairs. By
specifying ``replace={'.hdr' : ['.img'. '.img.gz'}``, the
``addExt`` function is able to figure out what you mean when you
wish to add an extension to ``file``, and ``file.hdr`` and
either ``file.img`` or ``file.img.gz`` (but not both) exist.
:arg fileGroups: Recognised file groups - see :func:`getFileGroup`.
if replace is None:
replace = {}
if fileGroups is None:
fileGroups = {}
if not mustExist:
......@@ -149,14 +135,14 @@ def addExt(prefix,
allPaths = [prefix + ext for ext in allowedExts]
exists = [op.isfile(e) for e in allPaths]
nexists = sum(exists)
allPaths = [p for p in allPaths if op.isfile(p)]
nexists = len(allPaths)
# Could not find any supported file
# with the specified prefix
if nexists == 0:
raise PathError('Could not find a supported file '
'with prefix {}'.format(prefix))
'with prefix "{}"'.format(prefix))
# Ambiguity! More than one supported
# file with the specified prefix.
......@@ -164,99 +150,260 @@ def addExt(prefix,
# Remove non-existent paths from the
# extended list, get all their
# suffixes, and potential replacements
allPaths = [allPaths[i] for i in range(len(allPaths)) if exists[i]]
suffixes = [getExt(e, allowedExts) for e in allPaths]
replacements = [replace.get(s) for s in suffixes]
hasReplace = [r is not None for r in replacements]
# If any replacement has been specified
# for any of the existing suffixes,
# see if we have a unique match for
# exactly one existing suffix, the
# one to be ignored/replaced.
if sum(hasReplace) == 1:
# Make sure there is exactly one potential
# replacement for this suffix. If there's
# more than one (e.g. file.hdr plus both
# file.img and file.img.gz) we can't resolve
# the ambiguity. In this case the code will
# fall through to the raise statement below.
toReplace = allPaths[hasReplace.index(True)]
replacements = replacements[hasReplace.index(True)]
replacements = [prefix + ext for ext in replacements]
replExists = [r in allPaths for r in replacements]
if sum(replExists) == 1:
replacedBy = replacements[replExists.index(True)]
allPaths[allPaths.index(toReplace)] = replacedBy
allPaths = list(set(allPaths))
exists = [True] * len(allPaths)
# There's more than one path match -
# we can't resolve the ambiguity
if len(allPaths) > 1:
# suffixes, and see if they match
# any file groups.
suffixes = [getExt(p, allowedExts) for p in allPaths]
groupMatches = [sorted(suffixes) == sorted(g) for g in fileGroups]
# Is there a match for a file suffix group?
# If not, multiple files with the specified
# prefix exist, and there is no way to
# resolve the ambiguity.
if sum(groupMatches) != 1:
raise PathError('More than one file with '
'prefix {}'.format(prefix))
'prefix "{}"'.format(prefix))
# Otherwise, we return a path
# to the file which matches the
# first suffix in the group.
groupIdx = groupMatches.index(True)
allPaths = [prefix + fileGroups[groupIdx][0]]
# Return the full file name of the
# supported file that was found
extIdx = exists.index(True)
return allPaths[extIdx]
return allPaths[0]
def removeExt(filename, allowedExts):
"""Removes the extension from the given file name. Returns the filename
unmodified if it does not have a supported extension.
def removeExt(filename, allowedExts=None):
"""Returns the base name of the given file name. See :func:`splitExt`. """
:arg filename: The file name to strip.
:arg allowedExts: A list of strings containing the allowed file
return splitExt(filename, allowedExts)[0]
# figure out the extension of the given file
extMatches = [filename.endswith(ext) for ext in allowedExts]
# the file does not have a supported extension
if not any(extMatches):
return filename
# figure out the length of the matched extension
extIdx = extMatches.index(True)
extLen = len(allowedExts[extIdx])
def getExt(filename, allowedExts=None):
"""Returns the extension of the given file name. See :func:`splitExt`. """
# and trim it from the file name
return filename[:-extLen]
return splitExt(filename, allowedExts)[1]
def getExt(filename, allowedExts=None):
"""Returns the extension from the given file name.
def splitExt(filename, allowedExts=None):
"""Returns the base name and the extension from the given file name.
If ``allowedExts`` is ``None``, this function is equivalent to using::
If ``allowedExts`` is provided, but the file does not end with an allowed
extension, a :exc:`PathError` is raised.
extension, a tuple containing ``(filename, '')`` is returned.
:arg filename: The file name to split.
:arg allowedExts: Allowed/recognised file extensions.
# If allowedExts is not specified,
# we just use op.splitext
if allowedExts is None:
return op.splitext(filename)[1]
return op.splitext(filename)
# Otherwise, try and find a suffix match
extMatches = [filename.endswith(ext) for ext in allowedExts]
# No match, assume there is no extension
if not any(extMatches):
raise PathError('{} does not end in a supported extension ({})'.format(
filename, ', '.join(allowedExts)))
return filename, ''
# Otherwise split the filename
# into its base and its extension
extIdx = extMatches.index(True)
return allowedExts[extIdx]
extLen = len(allowedExts[extIdx])
return filename[:-extLen], filename[-extLen:]
def getFileGroup(path, allowedExts=None, fileGroups=None, fullPaths=True):
"""If the given ``path`` is part of a ``fileGroup``, returns a list
containing the paths to all other files in the group (including the
``path`` itself).
If the ``path`` does not appear to be part of a file group, a list
containing only the ``path`` is returned.
File groups can be used to specify a collection of file suffixes which
should always exist alongside each other. This can be used to resolve
ambiguity when multiple files exist with the same ``prefix`` and supported
extensions (e.g. ``file.hdr`` and ``file.img``). The file groups are
specified as a list of sequences, for example::
[('.img', '.hdr'),
('.img.gz', '.hdr.gz')]
If you specify``fileGroups=[('.img', '.hdr')]`` and ``prefix='file'``, and
both ``file.img`` and ``file.hdr`` exist, the :func:`addExt` function would
return ``file.img`` (i.e. the file which matches the first extension in
the group).
Similarly, if you call the :func:`imcp` or :func:`immv` functions with the
above parameters, both ``file.img`` and ``file.hdr`` will be moved.
.. note:: The primary use-case of file groups is to resolve ambiguity with
respect to NIFTI and ANALYSE75 image pairs. By specifying
``fileGroups=[('.img', '.hdr'), ('.img.gz', '.hdr.gz')]``, the
:func:`addExt`, :func:`immv` and :func:`imcp` functions are able
to figure out what you mean when you specify ``file``, and both
``file.hdr`` and ``file.img`` (or ``file.hdr.gz`` and
``file.img.gz``) exist.
:arg path: Path to the file. Must contain the file extension.
:arg allowedExts: Allowed/recognised file extensions.
:arg fileGroups: Recognised file groups.
:arg fullPaths: If ``True`` (the default), full file paths (relative to
the ``path``) are returned. Otherwise, only the file
extensions in the group are returned.
if fileGroups is None:
return [path]
base, ext = splitExt(path, allowedExts)
matchedGroups = []
matchedGroupFiles = []
for group in fileGroups:
if ext not in group:
groupFiles = [base + s for s in group]
if not all([op.exists(f) for f in groupFiles]):
matchedGroups .append(group)
# If the given path is part of more
# than one existing file group, we
# can't resolve this ambiguity.
if len(matchedGroupFiles) != 1:
if fullPaths: return [path]
else: return [ext]
if fullPaths: return matchedGroupFiles[0]
else: return matchedGroups[ 0]
def imcp(src,
"""Copy the given ``src`` file to destination ``dest``.
:arg src: Path to copy. If ``allowedExts`` is provided,
the file extension can be omitted.
:arg dest: Destination path. Can be an incomplete file
specification (i.e. without the extension), or a
:arg allowedExts: Allowed/recognised file extensions.
:arg fileGroups: Recognised file groups - see the :func:`getFileGroup`
:arg overwrite: If ``True`` this function will overwrite files that
already exist. Defaults to ``False``.
:arg move: If ``True``, the files are moved, instead of being
base, ext = splitExt(src, allowedExts)
destIsDir = op.isdir(dest)
# If dest has been specified
# as a file name, we don't
# care about its extension.
if not destIsDir:
dest = removeExt(dest, allowedExts)
# src was specified without an
# extension, or the specitifed
# src does not have an allowed
# extension.
if ext == '':
# Try to resolve the specified src
# path - if src does not exist, or
# does not have an allowed extension,
# addExt will raise an error
src = addExt(src,
# We've resolved src to a
# full filename - split it
# again to get its extension
base, ext = splitExt(src, allowedExts)
# If the source is part of a file group,
# e.g. src.img/src.hdr, we want to copy
# the whole set of files. So here we
# build a list of source files that need
# to be copied/moved. The getFileGroup
# function returns all other files that
# are associated with this file (i.e.
# part of the same group).
# We store the sources as separate
# (base, ext) tuples, so we don't
# have to re-split when creating
# destination paths.
copySrcs = getFileGroup(src, allowedExts, fileGroups, fullPaths=False)
copySrcs = [(base, e) for e in copySrcs]
# Note that these additional files
# do not have to exist, e.g.
# imcp('blah.img', ...) will still
# work if there is no blah.hdr
copySrcs = [(b, e) for (b, e) in copySrcs if op.exists(b + e)]
# Build a list of destinations for each
# copy source - we build this list in
# advance, so we can fail if any of the
# destinations already exist.
copyDests = []
for i, (base, ext) in enumerate(copySrcs):
# We'll also take this opportunity
# to re-combine the source paths
copySrcs[i] = base + ext
if destIsDir: copyDests.append(dest)
else: copyDests.append(dest + ext)
# Fail if any of the destination
# paths already exist
if not overwrite:
if not destIsDir and any([op.exists(d) for d in copyDests]):
raise PathError('imcp error - a destination path already '
'exists ({})'.format(', '.join(copyDests)))
# Do the copy/move
for src, dest in zip(copySrcs, copyDests):
if move: shutil.move(src, dest)
else: shutil.copy(src, dest)
def immv(src, dest, allowedExts=None, fileGroups=None, overwrite=False):
"""Move the specified ``src`` to the specified ``dest``. See :func:`imcp`.
imcp(src, dest, allowedExts, fileGroups, overwrite, move=True)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment