#!/usr/bin/env python # # path.py - Utility functions for working with file/directory paths. # # Author: Paul McCarthy <pauldmccarthy@gmail.com> # """This module contains a few utility functions for working with file system paths. .. autosummary:: :nosignatures: deepest shallowest allFiles hasExt addExt removeExt getExt splitExt getFileGroup removeDuplicates uniquePrefix commonBase wslpath winpath """ import os.path as op import os import glob import operator import re from fsl.utils.platform import platform class PathError(Exception): """``Exception`` class raised by the functions defined in this module when something goes wrong. """ pass def deepest(path, suffixes): """Finds the deepest directory which ends with one of the given sequence of suffixes, or returns ``None`` if no directories end with any of the suffixes. """ path = path.strip() if path == op.sep or path == '': return None path = path.rstrip(op.sep) if any([path.endswith(s) for s in suffixes]): return path return deepest(op.dirname(path), suffixes) def shallowest(path, suffixes): """Finds the shallowest directory which ends with one of the given sequence of suffixes, or returns ``None`` if no directories end with any of the suffixes. """ path = path.strip() # We've reached the root of the file system if path == op.sep or path == '' or op.splitdrive(path)[1] == '': return None path = path.rstrip(op.sep) parent = shallowest(op.dirname(path), suffixes) if parent is not None: return parent if any([path.endswith(s) for s in suffixes]): return path return None def allFiles(root): """Return a list containing all files which exist underneath the specified ``root`` directory. """ files = [] for dirpath, _, filenames in os.walk(root): filenames = [op.join(dirpath, f) for f in filenames] files.extend(filenames) return files def hasExt(path, allowedExts): """Convenience function which returns ``True`` if the given ``path`` ends with any of the given ``allowedExts``, ``False`` otherwise. """ return any([path.endswith(e) for e in allowedExts]) def addExt(prefix, allowedExts=None, mustExist=True, defaultExt=None, fileGroups=None, unambiguous=True): """Adds a file extension to the given file ``prefix``. If ``mustExist`` is False, and the file does not already have a supported extension, the default extension is appended and the new file name returned. If the prefix already has a supported extension, it is returned unchanged. If ``mustExist`` is ``True`` (the default), the function checks to see if any files exist that have the given prefix, and a supported file extension. A :exc:`PathError` is raised if: - No files exist with the given prefix and a supported extension. - ``fileGroups is None`` and ``unambiguous is True``, and more than one file exists with the given prefix, and a supported extension. Otherwise the full file name is returned. :arg prefix: The file name prefix to modify. :arg allowedExts: List of allowed file extensions. :arg mustExist: Whether the file must exist or not. :arg defaultExt: Default file extension to use. :arg fileGroups: Recognised file groups - see :func:`getFileGroup`. :arg unambiguous: If ``True`` (the default), and more than one file exists with the specified ``prefix``, a :exc:`PathError` is raised. Otherwise, a list containing *all* matching files is returned. """ if allowedExts is None: allowedExts = [] if fileGroups is None: fileGroups = {} if defaultExt is not None and defaultExt not in allowedExts: allowedExts.append(defaultExt) if not mustExist: # the provided file name already # ends with a supported extension if hasExt(prefix, allowedExts): return prefix if defaultExt is not None: return prefix + defaultExt else: return prefix # If no allowed extensions were # provided, or the provided prefix # already ends with a supported # extension, check to see that it # exists. if len(allowedExts) == 0 or hasExt(prefix, allowedExts): allPaths = [prefix] # Otherwise, make a bunch of file names, one per # supported extension, and test to see if exactly # one of them exists. else: allPaths = [prefix + ext for ext in allowedExts] allPaths = [p for p in allPaths if op.isfile(p)] nexists = len(allPaths) # Could not find any supported file # with the specified prefix if nexists == 0: raise PathError('Could not find a supported file ' 'with prefix "{}"'.format(prefix)) # If ambiguity is ok, return # all matching paths elif not unambiguous: return allPaths # Ambiguity is not ok! More than # one supported file with the # specified prefix. elif nexists > 1: # Remove non-existent paths from the # extended list, get all their # suffixes, and see if they match # any file groups. suffixes = [getExt(p, allowedExts) for p in allPaths] groupMatches = [sorted(suffixes) == sorted(g) for g in fileGroups] # Is there a match for a file suffix group? # If not, multiple files with the specified # prefix exist, and there is no way to # resolve the ambiguity. if sum(groupMatches) != 1: raise PathError('More than one file with ' 'prefix "{}"'.format(prefix)) # Otherwise, we return a path # to the file which matches the # first suffix in the group. groupIdx = groupMatches.index(True) allPaths = [prefix + fileGroups[groupIdx][0]] # Return the full file name of the # supported file that was found return allPaths[0] def removeExt(filename, allowedExts=None, firstDot=False): """Returns the base name of the given file name. See :func:`splitExt`. """ return splitExt(filename, allowedExts, firstDot)[0] def getExt(filename, allowedExts=None, firstDot=False): """Returns the extension of the given file name. See :func:`splitExt`. """ return splitExt(filename, allowedExts, firstDot)[1] def splitExt(filename, allowedExts=None, firstDot=False): """Returns the base name and the extension from the given file name. If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this function is equivalent to using:: os.path.splitext(filename) If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file name is split on the first period that is found, rather than the last period. For example:: splitExt('image.nii.gz') # -> ('image.nii', '.gz') splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz') If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if the file does not end with an allowed extension, a tuple containing ``(filename, '')`` is returned. :arg filename: The file name to split. :arg allowedExts: Allowed/recognised file extensions. :arg firstDot: Split the file name on the first period, rather than the last period. Ignored if ``allowedExts`` is specified. """ # If allowedExts is not specified # we split on a period character if allowedExts is None: # split on last period - equivalent # to op.splitext if not firstDot: return op.splitext(filename) # split on first period else: idx = filename.find('.') if idx == -1: return filename, '' else: return filename[:idx], filename[idx:] # Otherwise, try and find a suffix match extMatches = [filename.endswith(ext) for ext in allowedExts] # No match, assume there is no extension if not any(extMatches): return filename, '' # Otherwise split the filename # into its base and its extension extIdx = extMatches.index(True) extLen = len(allowedExts[extIdx]) return filename[:-extLen], filename[-extLen:] def getFileGroup(path, allowedExts=None, fileGroups=None, fullPaths=True, unambiguous=False): """If the given ``path`` is part of a ``fileGroup``, returns a list containing the paths to all other files in the group (including the ``path`` itself). If the ``path`` does not appear to be part of a file group, or appears to be part of an incomplete file group, a list containing only the ``path`` is returned. If the ``path`` does not exist, or appears to be part of more than one file group, a :exc:`PathError` is raised. File groups can be used to specify a collection of file suffixes which should always exist alongside each other. This can be used to resolve ambiguity when multiple files exist with the same ``prefix`` and supported extensions (e.g. ``file.hdr`` and ``file.img``). The file groups are specified as a list of sequences, for example:: [('.img', '.hdr'), ('.img.gz', '.hdr.gz')] If you specify ``fileGroups=[('.img', '.hdr')]`` and ``prefix='file'``, and both ``file.img`` and ``file.hdr`` exist, the :func:`addExt` function would return ``file.img`` (i.e. the file which matches the first extension in the group). Similarly, if you call the :func:`.imcp.imcp` or :func:`.imcp.immv` functions with the above parameters, both ``file.img`` and ``file.hdr`` will be moved. .. note:: The primary use-case of file groups is to resolve ambiguity with respect to NIFTI and ANALYSE75 image pairs. By specifying ``fileGroups=[('.img', '.hdr'), ('.img.gz', '.hdr.gz')]``, the :func:`addExt`, :func:`.imcp.immv` and :func:`.imcp.imcp` functions are able to figure out what you mean when you specify ``file``, and both ``file.hdr`` and ``file.img`` (or ``file.hdr.gz`` and ``file.img.gz``) exist. :arg path: Path to the file. Must contain the file extension. :arg allowedExts: Allowed/recognised file extensions. :arg fileGroups: Recognised file groups. :arg fullPaths: If ``True`` (the default), full file paths (relative to the ``path``) are returned. Otherwise, only the file extensions in the group are returned. :arg unambiguous: Defaults to ``False``. If ``True``, and the path is not unambiguously part of one group, or part of no groups, a :exc:`PathError` is raised. Otherwise, the path is returned. """ path = addExt(path, allowedExts, mustExist=True, fileGroups=fileGroups) base, ext = splitExt(path, allowedExts) if fileGroups is None: if fullPaths: return [path] else: return [ext] matchedGroups = [] matchedGroupFiles = [] fullMatches = 0 partialMatches = 0 for group in fileGroups: if ext != '' and ext not in group: continue groupFiles = [base + s for s in group] exist = [op.exists(f) for f in groupFiles] if any(exist): partialMatches += 1 if all(exist): fullMatches += 1 matchedGroups .append(group) matchedGroupFiles.append(groupFiles) # Path is not part of any group if partialMatches == 0: if fullPaths: return [path] else: return [ext] # If the given path is part of more # than one existing file group, we # can't resolve this ambiguity. if fullMatches > 1: raise PathError('Path is part of multiple ' 'file groups: {}'.format(path)) # If the unambiguous flag is not set, # we don't care about partial matches if not unambiguous: partialMatches = 0 # The path is unambiguously part of a # complete file group - resolve it to # the first element of the group if fullMatches == 1 and partialMatches <= 1: if fullPaths: return matchedGroupFiles[0] else: return matchedGroups[ 0] # The path appears to be part of # an incomplete group - this is # potentially ambiguous, so give # up (but see the partialMatches # clobber above). elif partialMatches > 0: raise PathError('Path is part of an incomplete ' 'file group: {}'.format(path)) else: if fullPaths: return [path] else: return [ext] def removeDuplicates(paths, allowedExts=None, fileGroups=None): """Reduces the list of ``paths`` down to those which are unique with respect to the specified ``fileGroups``. For example, if you have a directory containing:: 001.hdr 001.img 002.hdr 002.img 003.hdr 003.img And you call ``removeDuplicates`` like so:: paths = ['001.img', '001.hdr', '002.img', '002.hdr', '003.img', '003.hdr'] allowedExts = ['.img', '.hdr'] fileGroups = [('.img', '.hdr')] removeDuplicates(paths, allowedExts, fileGroups) The returned list will be:: ['001.img', '002.img', '003.img'] If you provide ``allowedExts``, you may specify incomplete ``paths`` (i.e. without extensions), as long as there are no path ambiguities. A :exc:`PathError` will be raised if any of the ``paths`` do not exist, or if there are any ambiguities with respect to incomplete paths. :arg paths: List of paths to reduce. :arg allowedExts: Allowed/recognised file extensions. :arg fileGroups: Recognised file groups - see :func:`getFileGroup`. """ unique = [] for path in paths: groupFiles = getFileGroup(path, allowedExts, fileGroups) if not any([p in unique for p in groupFiles]): unique.append(groupFiles[0]) return unique def uniquePrefix(path): """Return the longest prefix for the given file name which unambiguously identifies it, relative to the other files in the same directory. Raises a :exc:`PathError` if a unique prefix could not be found (which will never happen if the path is valid). """ dirname, filename = op.split(path) idx = 0 prefix = op.join(dirname, filename[0]) hits = glob.glob('{}*'.format(prefix)) while True: # Found a unique prefix if len(hits) == 1: break # Should never happen if path is valid elif len(hits) == 0 or idx >= len(filename) - 1: raise PathError('No unique prefix for {}'.format(filename)) # Not unique - continue looping else: idx += 1 prefix = prefix + filename[idx] hits = [h for h in hits if h.startswith(prefix)] return prefix def commonBase(paths): """Identifies the deepest common base directory shared by all files in ``paths``. Raises a :exc:`PathError` if the paths have no common base. This will never happen for absolute paths (as the base will be e.g. ``'/'``). """ depths = [len(p.split(op.sep)) for p in paths] base = max(zip(depths, paths), key=operator.itemgetter(0))[1] last = base while True: base = op.split(base)[0] if base == last or len(base) == 0: break last = base if all([p.startswith(base) for p in paths]): return base raise PathError('No common base') def wslpath(winpath): """ Convert Windows path (or a command line argument containing a Windows path) to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``). Also supports paths in the form ``\\wsl$\\(distro)\\users\\...`` :param winpath: Command line argument which may (or may not) contain a Windows path. It is assumed to be either of the form <windows path> or --<arg>=<windows path>. Note that we don't need to handle --arg <windows path> or -a <windows path> since in these cases the argument and the path will be parsed as separate entities. :return: If ``winpath`` matches a Windows path, the converted argument (including the --<arg>= portion). Otherwise returns ``winpath`` unchanged. """ match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", winpath) if match: arg, path = match.group(1, 2) if arg is None: arg = "" return arg + path.replace("\\", "/") match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", winpath) if match: arg, drive, path = match.group(1, 2, 3) if arg is None: arg = "" return arg + "/mnt/" + drive.lower() + path.replace("\\", "/") return winpath def winpath(wslpath): """ Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a path that can be used from Windows. If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL. This requires WSL2 which supports the ``\\wsl$\`` network path. wslpath is assumed to be an absolute path. """ if not platform.fslwsl: return wslpath else: match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir) if match: distro = match.group(1) else: distro = None if not distro: raise RuntimeError("Could not identify WSL installation from FSLDIR (%s)" % platform.fsldir) return "\\\\wsl$\\" + distro + wslpath.replace("/", "\\")