Skip to content
Snippets Groups Projects
Forked from FSL / fslpy
819 commits behind the upstream repository.
path.py 17.87 KiB
#!/usr/bin/env python
#
# path.py - Utility functions for working with file/directory paths.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains a few utility functions for working with file system
paths.


.. autosummary::
   :nosignatures:

   deepest
   shallowest
   allFiles
   hasExt
   addExt
   removeExt
   getExt
   splitExt
   getFileGroup
   removeDuplicates
   uniquePrefix
   commonBase
   wslpath
   winpath
"""


import os.path as op
import            os
import            glob
import            operator
import            re

from fsl.utils.platform import platform


class PathError(Exception):
    """``Exception`` class raised by the functions defined in this module
    when something goes wrong.
    """
    pass


def deepest(path, suffixes):
    """Finds the deepest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
    """

    path = path.strip()

    if path == op.sep or path == '':
        return None

    path = path.rstrip(op.sep)

    if any([path.endswith(s) for s in suffixes]):
        return path

    return deepest(op.dirname(path), suffixes)


def shallowest(path, suffixes):
    """Finds the shallowest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
    """

    path = path.strip()

    # We've reached the root of the file system
    if path == op.sep or path == '' or op.splitdrive(path)[1] == '':
        return None

    path   = path.rstrip(op.sep)
    parent = shallowest(op.dirname(path), suffixes)

    if parent is not None:
        return parent

    if any([path.endswith(s) for s in suffixes]):
        return path

    return None


def allFiles(root):
    """Return a list containing all files which exist underneath the specified
    ``root`` directory.
    """

    files = []

    for dirpath, _, filenames in os.walk(root):
        filenames = [op.join(dirpath, f) for f in filenames]
        files.extend(filenames)

    return files


def hasExt(path, allowedExts):
    """Convenience function which returns ``True`` if the given ``path``
    ends with any of the given ``allowedExts``, ``False`` otherwise.
    """
    return any([path.endswith(e) for e in allowedExts])


def addExt(prefix,
           allowedExts=None,
           mustExist=True,
           defaultExt=None,
           fileGroups=None,
           unambiguous=True):
    """Adds a file extension to the given file ``prefix``.

    If ``mustExist`` is False, and the file does not already have a
    supported extension, the default extension is appended and the new
    file name returned. If the prefix already has a supported extension,
    it is returned unchanged.

    If ``mustExist`` is ``True`` (the default), the function checks to see
    if any files exist that have the given prefix, and a supported file
    extension.  A :exc:`PathError` is raised if:

       - No files exist with the given prefix and a supported extension.

       - ``fileGroups is None`` and ``unambiguous is True``, and more than
         one file exists with the given prefix, and a supported extension.

    Otherwise the full file name is returned.

    :arg prefix:      The file name prefix to modify.

    :arg allowedExts: List of allowed file extensions.

    :arg mustExist:   Whether the file must exist or not.

    :arg defaultExt:  Default file extension to use.

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.

    :arg unambiguous: If ``True`` (the default), and more than one file
                      exists with the specified ``prefix``, a
                      :exc:`PathError` is raised. Otherwise, a list
                      containing *all* matching files is returned.
    """

    if allowedExts is None: allowedExts = []
    if fileGroups  is None: fileGroups  = {}

    if defaultExt is not None and defaultExt not in allowedExts:
        allowedExts.append(defaultExt)

    if not mustExist:

        # the provided file name already
        # ends with a supported extension
        if hasExt(prefix, allowedExts):
            return prefix

        if defaultExt is not None: return prefix + defaultExt
        else:                      return prefix

    # If no allowed extensions were
    # provided, or the provided prefix
    # already ends with a supported
    # extension, check to see that it
    # exists.
    if len(allowedExts) == 0 or hasExt(prefix, allowedExts):
        allPaths = [prefix]

    # Otherwise, make a bunch of file names, one per
    # supported extension, and test to see if exactly
    # one of them exists.
    else:
        allPaths = [prefix + ext for ext in allowedExts]

    allPaths = [p for p in allPaths if op.isfile(p)]
    nexists  = len(allPaths)

    # Could not find any supported file
    # with the specified prefix
    if nexists == 0:
        raise PathError('Could not find a supported file '
                        'with prefix "{}"'.format(prefix))

    # If ambiguity is ok, return
    # all matching paths
    elif not unambiguous:
        return allPaths

    # Ambiguity is not ok! More than
    # one supported file with the
    # specified prefix.
    elif nexists > 1:

        # Remove non-existent paths from the
        # extended list, get all their
        # suffixes, and see if they match
        # any file groups.
        suffixes     = [getExt(p, allowedExts) for p in allPaths]
        groupMatches = [sorted(suffixes) == sorted(g) for g in fileGroups]

        # Is there a match for a file suffix group?
        # If not, multiple files with the specified
        # prefix exist, and there is no way to
        # resolve the ambiguity.
        if sum(groupMatches) != 1:
            raise PathError('More than one file with '
                            'prefix "{}"'.format(prefix))

        # Otherwise, we return a path
        # to the file which matches the
        # first suffix in the group.
        groupIdx = groupMatches.index(True)
        allPaths = [prefix + fileGroups[groupIdx][0]]

    # Return the full file name of the
    # supported file that was found
    return allPaths[0]


def removeExt(filename, allowedExts=None, firstDot=False):
    """Returns the base name of the given file name.  See :func:`splitExt`. """

    return splitExt(filename, allowedExts, firstDot)[0]


def getExt(filename, allowedExts=None, firstDot=False):
    """Returns the extension of the given file name.  See :func:`splitExt`. """

    return splitExt(filename, allowedExts, firstDot)[1]


def splitExt(filename, allowedExts=None, firstDot=False):
    """Returns the base name and the extension from the given file name.

    If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this
    function is equivalent to using::

        os.path.splitext(filename)

    If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file
    name is split on the first period that is found, rather than the last
    period. For example::

        splitExt('image.nii.gz')                # -> ('image.nii', '.gz')
        splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz')

    If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if
    the file does not end with an allowed extension, a tuple containing
    ``(filename, '')`` is returned.

    :arg filename:    The file name to split.

    :arg allowedExts: Allowed/recognised file extensions.

    :arg firstDot:    Split the file name on the first period, rather than the
                      last period. Ignored if ``allowedExts`` is specified.
    """

    # If allowedExts is not specified
    # we split on a period character
    if allowedExts is None:

        # split on last period - equivalent
        # to op.splitext
        if not firstDot:
            return op.splitext(filename)

        # split on first period
        else:
            idx = filename.find('.')
            if idx == -1:
                return filename, ''
            else:
                return filename[:idx], filename[idx:]

    # Otherwise, try and find a suffix match
    extMatches = [filename.endswith(ext) for ext in allowedExts]

    # No match, assume there is no extension
    if not any(extMatches):
        return filename, ''

    # Otherwise split the filename
    # into its base and its extension
    extIdx = extMatches.index(True)
    extLen = len(allowedExts[extIdx])

    return filename[:-extLen], filename[-extLen:]


def getFileGroup(path,
                 allowedExts=None,
                 fileGroups=None,
                 fullPaths=True,
                 unambiguous=False):
    """If the given ``path`` is part of a ``fileGroup``, returns a list
    containing the paths to all other files in the group (including the
    ``path`` itself).

    If the ``path`` does not appear to be part of a file group, or appears to
    be part of an incomplete file group, a list containing only the ``path``
    is returned.

    If the ``path`` does not exist, or appears to be part of more than one
    file group, a :exc:`PathError` is raised.

    File groups can be used to specify a collection of file suffixes which
    should always exist alongside each other. This can be used to resolve
    ambiguity when multiple files exist with the same ``prefix`` and supported
    extensions (e.g. ``file.hdr`` and ``file.img``). The file groups are
    specified as a list of sequences, for example::

        [('.img',    '.hdr'),
         ('.img.gz', '.hdr.gz')]

    If you specify ``fileGroups=[('.img', '.hdr')]`` and ``prefix='file'``, and
    both ``file.img`` and ``file.hdr`` exist, the :func:`addExt` function would
    return ``file.img`` (i.e. the file which matches the first extension in
    the group).

    Similarly, if you call the :func:`.imcp.imcp` or :func:`.imcp.immv`
    functions with the above parameters, both ``file.img`` and ``file.hdr``
    will be moved.

    .. note:: The primary use-case of file groups is to resolve ambiguity with
              respect to NIFTI and ANALYSE75 image pairs. By specifying
              ``fileGroups=[('.img', '.hdr'), ('.img.gz', '.hdr.gz')]``, the
              :func:`addExt`, :func:`.imcp.immv` and :func:`.imcp.imcp`
              functions are able to figure out what you mean when you specify
              ``file``, and both ``file.hdr`` and ``file.img`` (or
              ``file.hdr.gz`` and ``file.img.gz``) exist.

    :arg path:        Path to the file. Must contain the file extension.

    :arg allowedExts: Allowed/recognised file extensions.

    :arg fileGroups:  Recognised file groups.

    :arg fullPaths:   If ``True`` (the default), full file paths (relative to
                      the ``path``) are returned. Otherwise, only the file
                      extensions in the group are returned.

    :arg unambiguous: Defaults to ``False``. If ``True``, and the path
                      is not unambiguously part of one group, or part of
                      no groups, a :exc:`PathError` is raised.
                      Otherwise, the path is returned.
    """

    path = addExt(path, allowedExts, mustExist=True, fileGroups=fileGroups)
    base, ext = splitExt(path, allowedExts)

    if fileGroups is None:
        if fullPaths: return [path]
        else:         return [ext]

    matchedGroups     = []
    matchedGroupFiles = []
    fullMatches       = 0
    partialMatches    = 0

    for group in fileGroups:

        if ext != '' and ext not in group:
            continue

        groupFiles = [base + s for s in group]
        exist      = [op.exists(f) for f in groupFiles]

        if any(exist):
            partialMatches += 1

        if all(exist):
            fullMatches += 1
            matchedGroups    .append(group)
            matchedGroupFiles.append(groupFiles)

    # Path is not part of any group
    if partialMatches == 0:
        if fullPaths: return [path]
        else:         return [ext]

    # If the given path is part of more
    # than one existing file group, we
    # can't resolve this ambiguity.
    if fullMatches > 1:
        raise PathError('Path is part of multiple '
                        'file groups: {}'.format(path))

    # If the unambiguous flag is not set,
    # we don't care about partial matches
    if not unambiguous:
        partialMatches = 0

    # The path is unambiguously part of a
    # complete file group - resolve it to
    # the first element of the group
    if fullMatches == 1 and partialMatches <= 1:
        if fullPaths: return matchedGroupFiles[0]
        else:         return matchedGroups[    0]

    # The path appears to be part of
    # an incomplete group - this is
    # potentially ambiguous, so give
    # up (but see the partialMatches
    # clobber above).
    elif partialMatches > 0:
        raise PathError('Path is part of an incomplete '
                        'file group: {}'.format(path))

    else:
        if fullPaths: return [path]
        else:         return [ext]


def removeDuplicates(paths, allowedExts=None, fileGroups=None):
    """Reduces the list of ``paths`` down to those which are unique with
    respect to the specified ``fileGroups``.

    For example, if you have a directory containing::

        001.hdr
        001.img
        002.hdr
        002.img
        003.hdr
        003.img

    And you call ``removeDuplicates`` like so::

         paths       = ['001.img', '001.hdr',
                        '002.img', '002.hdr',
                        '003.img', '003.hdr']

         allowedExts = ['.img',  '.hdr']
         fileGroups  = [('.img', '.hdr')]

         removeDuplicates(paths, allowedExts, fileGroups)

    The returned list will be::

         ['001.img', '002.img', '003.img']

    If you provide ``allowedExts``, you may specify incomplete ``paths`` (i.e.
    without extensions), as long as there are no path ambiguities.

    A :exc:`PathError` will be raised if any of the ``paths`` do not exist,
    or if there are any ambiguities with respect to incomplete paths.

    :arg paths:       List of paths to reduce.

    :arg allowedExts: Allowed/recognised file extensions.

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
    """

    unique = []

    for path in paths:

        groupFiles = getFileGroup(path, allowedExts, fileGroups)

        if not any([p in unique for p in groupFiles]):
            unique.append(groupFiles[0])

    return unique


def uniquePrefix(path):
    """Return the longest prefix for the given file name which unambiguously
    identifies it, relative to the other files in the same directory.

    Raises a :exc:`PathError` if a unique prefix could not be found (which
    will never happen if the path is valid).
    """

    dirname, filename = op.split(path)

    idx    = 0
    prefix = op.join(dirname, filename[0])
    hits   = glob.glob('{}*'.format(prefix))

    while True:

        # Found a unique prefix
        if len(hits) == 1:
            break

        # Should never happen if path is valid
        elif len(hits) == 0 or idx >= len(filename) - 1:
            raise PathError('No unique prefix for {}'.format(filename))

        # Not unique - continue looping
        else:
            idx    += 1
            prefix  = prefix + filename[idx]
            hits    = [h for h in hits if h.startswith(prefix)]

    return prefix


def commonBase(paths):
    """Identifies the deepest common base directory shared by all files
    in ``paths``.

    Raises a :exc:`PathError` if the paths have no common base. This will
    never happen for absolute paths (as the base will be e.g. ``'/'``).
    """

    depths = [len(p.split(op.sep)) for p in paths]
    base   = max(zip(depths, paths), key=operator.itemgetter(0))[1]
    last   = base

    while True:

        base = op.split(base)[0]

        if base == last or len(base) == 0:
            break

        last = base

        if all([p.startswith(base) for p in paths]):
            return base

    raise PathError('No common base')


def wslpath(winpath):
    """
    Convert Windows path (or a command line argument containing a Windows path)
    to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``). Also supports
    paths in the form ``\\wsl$\\(distro)\\users\\...``

    :param winpath: Command line argument which may (or may not) contain a Windows path. It is assumed to be
                    either of the form <windows path> or --<arg>=<windows path>. Note that we don't need to
                    handle --arg <windows path> or -a <windows path> since in these cases the argument
                    and the path will be parsed as separate entities.
    :return: If ``winpath`` matches a Windows path, the converted argument (including the --<arg>= portion).
                Otherwise returns ``winpath`` unchanged.
    """
    match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", winpath)
    if match:
        arg, path = match.group(1, 2)
        if arg is None:
            arg = ""
        return arg + path.replace("\\", "/")

    match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", winpath)
    if match:
        arg, drive, path = match.group(1, 2, 3)
        if arg is None:
            arg = ""
        return arg + "/mnt/" + drive.lower() + path.replace("\\", "/")

    return winpath


def winpath(wslpath):
    """
    Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a path that can be used from
    Windows.

    If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified
    Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL.
    This requires WSL2 which supports the ``\\wsl$\`` network path.
    wslpath is assumed to be an absolute path.
    """
    if not platform.fslwsl:
        return wslpath
    else:
        match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir)
        if match:
            distro = match.group(1)
        else:
            distro = None

        if not distro:
            raise RuntimeError("Could not identify WSL installation from FSLDIR (%s)" % platform.fsldir)

        return "\\\\wsl$\\" + distro + wslpath.replace("/", "\\")