path.py 18.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
#
# path.py - Utility functions for working with file/directory paths.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains a few utility functions for working with file system
paths.


.. autosummary::
   :nosignatures:

   deepest
15
   shallowest
16
   allFiles
17
   hasExt
18
19
   addExt
   removeExt
20
   getExt
21
22
   splitExt
   getFileGroup
23
   removeDuplicates
24
   uniquePrefix
25
   commonBase
26
27
   wslpath
   winpath
28
29
30
31
"""


import os.path as op
32
33
import            os
import            glob
34
import            operator
35
import            pathlib
36
import            re
37

38
39
from typing import Sequence, Tuple, Union

40
from fsl.utils.platform import platform
41

42

43
44
45
PathLike = Union[str, pathlib.Path]


46
class PathError(Exception):
47
48
49
    """``Exception`` class raised by the functions defined in this module
    when something goes wrong.
    """
50
51


52
53
54
55
56
57
58
59
def deepest(path, suffixes):
    """Finds the deepest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
    """

    path = path.strip()

60
    if path in (op.sep, ''):
61
62
63
64
        return None

    path = path.rstrip(op.sep)

65
    if any(path.endswith(s) for s in suffixes):
66
67
68
69
70
71
72
73
74
        return path

    return deepest(op.dirname(path), suffixes)


def shallowest(path, suffixes):
    """Finds the shallowest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
75
76
    """

77
78
79
    path = path.strip()

    # We've reached the root of the file system
Paul McCarthy's avatar
Paul McCarthy committed
80
    if path == op.sep or path == '' or op.splitdrive(path)[1] == '':
81
82
83
84
85
86
87
88
        return None

    path   = path.rstrip(op.sep)
    parent = shallowest(op.dirname(path), suffixes)

    if parent is not None:
        return parent

89
    if any(path.endswith(s) for s in suffixes):
90
91
        return path

92
    return None
93
94


95
96
97
98
99
100
101
102
103
104
105
106
107
108
def allFiles(root):
    """Return a list containing all files which exist underneath the specified
    ``root`` directory.
    """

    files = []

    for dirpath, _, filenames in os.walk(root):
        filenames = [op.join(dirpath, f) for f in filenames]
        files.extend(filenames)

    return files


109
110
def hasExt(path        : PathLike,
           allowedExts : Sequence[str]) -> bool:
111
112
113
    """Convenience function which returns ``True`` if the given ``path``
    ends with any of the given ``allowedExts``, ``False`` otherwise.
    """
114
115
116
117
118
119
120
121
122
123
124
125
    path = str(path)
    return any(path.endswith(e) for e in allowedExts)


def addExt(
        prefix      : PathLike,
        allowedExts : Sequence[str]           = None,
        mustExist   : bool                    = True,
        defaultExt  : str                     = None,
        fileGroups  : Sequence[Sequence[str]] = None,
        unambiguous : bool                    = True
) -> Union[Sequence[str], str]:
126
127
    """Adds a file extension to the given file ``prefix``.

128
    If ``mustExist`` is False, and the file does not already have a
129
130
131
132
    supported extension, the default extension is appended and the new
    file name returned. If the prefix already has a supported extension,
    it is returned unchanged.

133
134
    If ``mustExist`` is ``True`` (the default), the function checks to see
    if any files exist that have the given prefix, and a supported file
135
    extension.  A :exc:`PathError` is raised if:
136
137

       - No files exist with the given prefix and a supported extension.
138

139
140
       - ``fileGroups is None`` and ``unambiguous is True``, and more than
         one file exists with the given prefix, and a supported extension.
141
142
143

    Otherwise the full file name is returned.

144
145
    :arg prefix:      The file name prefix to modify.

146
    :arg allowedExts: List of allowed file extensions.
147

148
    :arg mustExist:   Whether the file must exist or not.
149

150
    :arg defaultExt:  Default file extension to use.
151
152

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
153
154
155
156
157

    :arg unambiguous: If ``True`` (the default), and more than one file
                      exists with the specified ``prefix``, a
                      :exc:`PathError` is raised. Otherwise, a list
                      containing *all* matching files is returned.
158
159
    """

160
161
    prefix = str(prefix)

162
163
164
165
166
    if allowedExts is None: allowedExts = []
    if fileGroups  is None: fileGroups  = {}

    if defaultExt is not None and defaultExt not in allowedExts:
        allowedExts.append(defaultExt)
167

168
169
170
    if not mustExist:

        # the provided file name already
Paul McCarthy's avatar
Paul McCarthy committed
171
        # ends with a supported extension
172
        if hasExt(prefix, allowedExts):
173
174
175
            return prefix

        if defaultExt is not None: return prefix + defaultExt
176
177
178
179
180
181
182
        else:                      return prefix

    # If no allowed extensions were
    # provided, or the provided prefix
    # already ends with a supported
    # extension, check to see that it
    # exists.
183
    if len(allowedExts) == 0 or hasExt(prefix, allowedExts):
184
        allPaths = [prefix]
185

186
187
188
189
    # Otherwise, make a bunch of file names, one per
    # supported extension, and test to see if exactly
    # one of them exists.
    else:
190
        allPaths = [prefix + ext for ext in allowedExts]
191

192
193
    allPaths = [p for p in allPaths if op.isfile(p)]
    nexists  = len(allPaths)
194
195
196

    # Could not find any supported file
    # with the specified prefix
197
198
    if nexists == 0:
        raise PathError('Could not find a supported file '
199
                        'with prefix "{}"'.format(prefix))
200

201
202
    # If ambiguity is ok, return
    # all matching paths
203
204
    if not unambiguous:

205
206
207
208
209
        return allPaths

    # Ambiguity is not ok! More than
    # one supported file with the
    # specified prefix.
210
211
212
213
    elif nexists > 1:

        # Remove non-existent paths from the
        # extended list, get all their
214
215
216
217
218
219
220
221
222
223
        # suffixes, and see if they match
        # any file groups.
        suffixes     = [getExt(p, allowedExts) for p in allPaths]
        groupMatches = [sorted(suffixes) == sorted(g) for g in fileGroups]

        # Is there a match for a file suffix group?
        # If not, multiple files with the specified
        # prefix exist, and there is no way to
        # resolve the ambiguity.
        if sum(groupMatches) != 1:
224
            raise PathError('More than one file with '
225
226
                            'prefix "{}"'.format(prefix))

227
228
        # Otherwise, we return a path
        # to the file which matches the
229
230
231
        # first suffix in the group.
        groupIdx = groupMatches.index(True)
        allPaths = [prefix + fileGroups[groupIdx][0]]
232
233
234

    # Return the full file name of the
    # supported file that was found
235
    return allPaths[0]
236
237


238
239
240
241
242
def removeExt(
        filename    : PathLike,
        allowedExts : Sequence[str] = None,
        firstDot    : bool          = False
) -> str:
243
    """Returns the base name of the given file name.  See :func:`splitExt`. """
244
    return splitExt(filename, allowedExts, firstDot)[0]
245
246


247
248
249
250
251
def getExt(
        filename    : PathLike,
        allowedExts : Sequence[str] = None,
        firstDot    : bool          = False
) -> str:
252
    """Returns the extension of the given file name.  See :func:`splitExt`. """
253
    return splitExt(filename, allowedExts, firstDot)[1]
254
255


256
257
258
259
260
def splitExt(
        filename    : PathLike,
        allowedExts : Sequence[str] = None,
        firstDot    : bool          = False
) -> Tuple[str, str]:
261
    """Returns the base name and the extension from the given file name.
262

263
264
    If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this
    function is equivalent to using::
265

266
        os.path.splitext(filename)
267

268
269
270
271
272
273
274
275
276
277
    If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file
    name is split on the first period that is found, rather than the last
    period. For example::

        splitExt('image.nii.gz')                # -> ('image.nii', '.gz')
        splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz')

    If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if
    the file does not end with an allowed extension, a tuple containing
    ``(filename, '')`` is returned.
278

279
    :arg filename:    The file name to split.
280

281
    :arg allowedExts: Allowed/recognised file extensions.
282
283
284

    :arg firstDot:    Split the file name on the first period, rather than the
                      last period. Ignored if ``allowedExts`` is specified.
285
286
    """

287
288
    filename = str(filename)

289
290
    # If allowedExts is not specified
    # we split on a period character
291
    if allowedExts is None:
292
293
294
295
296
297
298
299
300
301
302
303
304

        # split on last period - equivalent
        # to op.splitext
        if not firstDot:
            return op.splitext(filename)

        # split on first period
        else:
            idx = filename.find('.')
            if idx == -1:
                return filename, ''
            else:
                return filename[:idx], filename[idx:]
305
306
307
308

    # Otherwise, try and find a suffix match
    extMatches = [filename.endswith(ext) for ext in allowedExts]

309
    # No match, assume there is no extension
310
    if not any(extMatches):
311
        return filename, ''
312

313
    # Otherwise split the filename
314
    # into its base and its extension
315
    extIdx = extMatches.index(True)
316
317
318
319
320
    extLen = len(allowedExts[extIdx])

    return filename[:-extLen], filename[-extLen:]


321
322
323
324
325
def getFileGroup(path,
                 allowedExts=None,
                 fileGroups=None,
                 fullPaths=True,
                 unambiguous=False):
326
    """If the given ``path`` is part of a ``fileGroup``, returns a list
327
328
329
    containing the paths to all other files in the group (including the
    ``path`` itself).

330
331
332
333
    If the ``path`` does not appear to be part of a file group, or appears to
    be part of an incomplete file group, a list containing only the ``path``
    is returned.

334
    If the ``path`` does not exist, or appears to be part of more than one
335
    file group, a :exc:`PathError` is raised.
336
337
338
339
340
341

    File groups can be used to specify a collection of file suffixes which
    should always exist alongside each other. This can be used to resolve
    ambiguity when multiple files exist with the same ``prefix`` and supported
    extensions (e.g. ``file.hdr`` and ``file.img``). The file groups are
    specified as a list of sequences, for example::
342

343
344
        [('.img',    '.hdr'),
         ('.img.gz', '.hdr.gz')]
345

346
    If you specify ``fileGroups=[('.img', '.hdr')]`` and ``prefix='file'``, and
347
348
349
350
    both ``file.img`` and ``file.hdr`` exist, the :func:`addExt` function would
    return ``file.img`` (i.e. the file which matches the first extension in
    the group).

351
352
353
    Similarly, if you call the :func:`.imcp.imcp` or :func:`.imcp.immv`
    functions with the above parameters, both ``file.img`` and ``file.hdr``
    will be moved.
354
355
356
357

    .. note:: The primary use-case of file groups is to resolve ambiguity with
              respect to NIFTI and ANALYSE75 image pairs. By specifying
              ``fileGroups=[('.img', '.hdr'), ('.img.gz', '.hdr.gz')]``, the
358
359
360
361
              :func:`addExt`, :func:`.imcp.immv` and :func:`.imcp.imcp`
              functions are able to figure out what you mean when you specify
              ``file``, and both ``file.hdr`` and ``file.img`` (or
              ``file.hdr.gz`` and ``file.img.gz``) exist.
362

363
    :arg path:        Path to the file. Must contain the file extension.
364

365
    :arg allowedExts: Allowed/recognised file extensions.
366

367
    :arg fileGroups:  Recognised file groups.
368

369
370
371
    :arg fullPaths:   If ``True`` (the default), full file paths (relative to
                      the ``path``) are returned. Otherwise, only the file
                      extensions in the group are returned.
372

373
    :arg unambiguous: Defaults to ``False``. If ``True``, and the path
374
                      is not unambiguously part of one group, or part of
375
376
                      no groups, a :exc:`PathError` is raised.
                      Otherwise, the path is returned.
377
378
    """

379
    path = addExt(path, allowedExts, mustExist=True, fileGroups=fileGroups)
380
    base, ext = splitExt(path, allowedExts)
381

382
383
384
    if fileGroups is None:
        if fullPaths: return [path]
        else:         return [ext]
385
386
387

    matchedGroups     = []
    matchedGroupFiles = []
388
389
    fullMatches       = 0
    partialMatches    = 0
390
391
392

    for group in fileGroups:

393
        if ext != '' and ext not in group:
394
395
396
            continue

        groupFiles = [base + s for s in group]
397
        exist      = [op.exists(f) for f in groupFiles]
398

399
400
        if any(exist):
            partialMatches += 1
401

402
403
404
405
        if all(exist):
            fullMatches += 1
            matchedGroups    .append(group)
            matchedGroupFiles.append(groupFiles)
406

407
    # Path is not part of any group
408
    if partialMatches == 0:
409
410
        if fullPaths: return [path]
        else:         return [ext]
411

412
413
    # If the given path is part of more
    # than one existing file group, we
414
    # can't resolve this ambiguity.
415
    if fullMatches > 1:
416
417
418
        raise PathError('Path is part of multiple '
                        'file groups: {}'.format(path))

419
420
421
422
423
424
425
426
    # If the unambiguous flag is not set,
    # we don't care about partial matches
    if not unambiguous:
        partialMatches = 0

    # The path is unambiguously part of a
    # complete file group - resolve it to
    # the first element of the group
Paul McCarthy's avatar
Paul McCarthy committed
427
    if fullMatches == 1 and partialMatches <= 1:
428
429
430
431
432
        if fullPaths: return matchedGroupFiles[0]
        else:         return matchedGroups[    0]

    # The path appears to be part of
    # an incomplete group - this is
Paul McCarthy's avatar
Paul McCarthy committed
433
    # potentially ambiguous, so give
434
435
436
437
438
    # up (but see the partialMatches
    # clobber above).
    elif partialMatches > 0:
        raise PathError('Path is part of an incomplete '
                        'file group: {}'.format(path))
439

440
441
442
443
    else:
        if fullPaths: return [path]
        else:         return [ext]

444
445

def removeDuplicates(paths, allowedExts=None, fileGroups=None):
446
    """Reduces the list of ``paths`` down to those which are unique with
447
448
449
    respect to the specified ``fileGroups``.

    For example, if you have a directory containing::
450

451
452
453
454
455
456
457
458
459
460
461
462
        001.hdr
        001.img
        002.hdr
        002.img
        003.hdr
        003.img

    And you call ``removeDuplicates`` like so::

         paths       = ['001.img', '001.hdr',
                        '002.img', '002.hdr',
                        '003.img', '003.hdr']
463

464
465
466
467
468
469
470
471
472
473
474
475
476
477
         allowedExts = ['.img',  '.hdr']
         fileGroups  = [('.img', '.hdr')]

         removeDuplicates(paths, allowedExts, fileGroups)

    The returned list will be::

         ['001.img', '002.img', '003.img']

    If you provide ``allowedExts``, you may specify incomplete ``paths`` (i.e.
    without extensions), as long as there are no path ambiguities.

    A :exc:`PathError` will be raised if any of the ``paths`` do not exist,
    or if there are any ambiguities with respect to incomplete paths.
478
479

    :arg paths:       List of paths to reduce.
480
481
482
483
484
485
486
487
488
489
490
491

    :arg allowedExts: Allowed/recognised file extensions.

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
    """

    unique = []

    for path in paths:

        groupFiles = getFileGroup(path, allowedExts, fileGroups)

492
        if not any(p in unique for p in groupFiles):
493
494
495
            unique.append(groupFiles[0])

    return unique
496
497
498
499
500
501


def uniquePrefix(path):
    """Return the longest prefix for the given file name which unambiguously
    identifies it, relative to the other files in the same directory.

502
    Raises a :exc:`PathError` if a unique prefix could not be found (which
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
    will never happen if the path is valid).
    """

    dirname, filename = op.split(path)

    idx    = 0
    prefix = op.join(dirname, filename[0])
    hits   = glob.glob('{}*'.format(prefix))

    while True:

        # Found a unique prefix
        if len(hits) == 1:
            break

        # Should never happen if path is valid
519
        if len(hits) == 0 or idx >= len(filename) - 1:
520
            raise PathError('No unique prefix for {}'.format(filename))
521
522

        # Not unique - continue looping
523
524
525
        idx    += 1
        prefix  = prefix + filename[idx]
        hits    = [h for h in hits if h.startswith(prefix)]
526
527

    return prefix
528
529
530
531
532
533
534
535
536
537
538
539


def commonBase(paths):
    """Identifies the deepest common base directory shared by all files
    in ``paths``.

    Raises a :exc:`PathError` if the paths have no common base. This will
    never happen for absolute paths (as the base will be e.g. ``'/'``).
    """

    depths = [len(p.split(op.sep)) for p in paths]
    base   = max(zip(depths, paths), key=operator.itemgetter(0))[1]
540
    last   = base
541
542
543
544
545

    while True:

        base = op.split(base)[0]

546
        if base == last or len(base) == 0:
547
548
            break

549
550
        last = base

551
        if all(p.startswith(base) for p in paths):
552
553
554
            return base

    raise PathError('No common base')
555

556

557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def wslpath(path):
    """Convert Windows path (or a command line argument containing a Windows
    path) to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``).
    Also supports paths in the form ``\\wsl$\\(distro)\\users\\...``

    :param winpath: Command line argument which may (or may not) contain a
                    Windows path. It is assumed to be either of the form
                    <windows path> or --<arg>=<windows path>. Note that we
                    don't need to handle --arg <windows path> or -a <windows
                    path> since in these cases the argument and the path will
                    be parsed as separate entities.
    :return:        If ``winpath`` matches a Windows path, the converted
                    argument (including the --<arg>= portion).  Otherwise
                    returns ``winpath`` unchanged.

572
    """
573
    match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", path)
574
575
576
577
578
579
    if match:
        arg, path = match.group(1, 2)
        if arg is None:
            arg = ""
        return arg + path.replace("\\", "/")

580
    match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", path)
581
582
583
584
    if match:
        arg, drive, path = match.group(1, 2, 3)
        if arg is None:
            arg = ""
585
        return arg + "/mnt/" + drive.lower() + path.replace("\\", "/")
586

587
    return path
588

589

590
591
592
def winpath(path):
    """Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a
    path that can be used from Windows.
593
594
595

    If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified
    Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL.
596
    This requires WSL2 which supports the ``\\wsl$\\`` network path.
597
598
599
    wslpath is assumed to be an absolute path.
    """
    if not platform.fslwsl:
600
        return path
601
602
603
604
605
606
607
608
    else:
        match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir)
        if match:
            distro = match.group(1)
        else:
            distro = None

        if not distro:
609
610
            raise RuntimeError('Could not identify WSL installation from '
                               'FSLDIR (%s)' % platform.fsldir)
611

612
        return "\\\\wsl$\\" + distro + path.replace("/", "\\")