path.py 18.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
#
# path.py - Utility functions for working with file/directory paths.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains a few utility functions for working with file system
paths.


.. autosummary::
   :nosignatures:

   deepest
15
   shallowest
16
   allFiles
17
   hasExt
18
19
   addExt
   removeExt
20
   getExt
21
22
   splitExt
   getFileGroup
23
   removeDuplicates
24
   uniquePrefix
25
   commonBase
26
27
   wslpath
   winpath
28
29
30
31
"""


import os.path as op
32
33
import            os
import            glob
34
import            operator
35
import            pathlib
36
import            re
37

38
39
from typing import Sequence, Tuple, Union

40

41
42
43
PathLike = Union[str, pathlib.Path]


44
class PathError(Exception):
45
46
47
    """``Exception`` class raised by the functions defined in this module
    when something goes wrong.
    """
48
49


50
51
52
53
54
55
56
57
def deepest(path, suffixes):
    """Finds the deepest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
    """

    path = path.strip()

58
    if path in (op.sep, ''):
59
60
61
62
        return None

    path = path.rstrip(op.sep)

63
    if any(path.endswith(s) for s in suffixes):
64
65
66
67
68
69
70
71
72
        return path

    return deepest(op.dirname(path), suffixes)


def shallowest(path, suffixes):
    """Finds the shallowest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
73
74
    """

75
76
77
    path = path.strip()

    # We've reached the root of the file system
Paul McCarthy's avatar
Paul McCarthy committed
78
    if path == op.sep or path == '' or op.splitdrive(path)[1] == '':
79
80
81
82
83
84
85
86
        return None

    path   = path.rstrip(op.sep)
    parent = shallowest(op.dirname(path), suffixes)

    if parent is not None:
        return parent

87
    if any(path.endswith(s) for s in suffixes):
88
89
        return path

90
    return None
91
92


93
94
95
96
97
98
99
100
101
102
103
104
105
106
def allFiles(root):
    """Return a list containing all files which exist underneath the specified
    ``root`` directory.
    """

    files = []

    for dirpath, _, filenames in os.walk(root):
        filenames = [op.join(dirpath, f) for f in filenames]
        files.extend(filenames)

    return files


107
108
def hasExt(path        : PathLike,
           allowedExts : Sequence[str]) -> bool:
109
110
111
    """Convenience function which returns ``True`` if the given ``path``
    ends with any of the given ``allowedExts``, ``False`` otherwise.
    """
112
113
114
115
116
117
118
119
120
121
122
123
    path = str(path)
    return any(path.endswith(e) for e in allowedExts)


def addExt(
        prefix      : PathLike,
        allowedExts : Sequence[str]           = None,
        mustExist   : bool                    = True,
        defaultExt  : str                     = None,
        fileGroups  : Sequence[Sequence[str]] = None,
        unambiguous : bool                    = True
) -> Union[Sequence[str], str]:
124
125
    """Adds a file extension to the given file ``prefix``.

126
    If ``mustExist`` is False, and the file does not already have a
127
128
129
130
    supported extension, the default extension is appended and the new
    file name returned. If the prefix already has a supported extension,
    it is returned unchanged.

131
132
    If ``mustExist`` is ``True`` (the default), the function checks to see
    if any files exist that have the given prefix, and a supported file
133
    extension.  A :exc:`PathError` is raised if:
134
135

       - No files exist with the given prefix and a supported extension.
136

137
138
       - ``fileGroups is None`` and ``unambiguous is True``, and more than
         one file exists with the given prefix, and a supported extension.
139
140
141

    Otherwise the full file name is returned.

142
143
    :arg prefix:      The file name prefix to modify.

144
    :arg allowedExts: List of allowed file extensions.
145

146
    :arg mustExist:   Whether the file must exist or not.
147

148
    :arg defaultExt:  Default file extension to use.
149
150

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
151
152
153
154
155

    :arg unambiguous: If ``True`` (the default), and more than one file
                      exists with the specified ``prefix``, a
                      :exc:`PathError` is raised. Otherwise, a list
                      containing *all* matching files is returned.
156
157
    """

158
159
    prefix = str(prefix)

160
161
162
163
164
    if allowedExts is None: allowedExts = []
    if fileGroups  is None: fileGroups  = {}

    if defaultExt is not None and defaultExt not in allowedExts:
        allowedExts.append(defaultExt)
165

166
167
168
    if not mustExist:

        # the provided file name already
Paul McCarthy's avatar
Paul McCarthy committed
169
        # ends with a supported extension
170
        if hasExt(prefix, allowedExts):
171
172
173
            return prefix

        if defaultExt is not None: return prefix + defaultExt
174
175
176
177
178
179
180
        else:                      return prefix

    # If no allowed extensions were
    # provided, or the provided prefix
    # already ends with a supported
    # extension, check to see that it
    # exists.
181
    if len(allowedExts) == 0 or hasExt(prefix, allowedExts):
182
        allPaths = [prefix]
183

184
185
186
187
    # Otherwise, make a bunch of file names, one per
    # supported extension, and test to see if exactly
    # one of them exists.
    else:
188
        allPaths = [prefix + ext for ext in allowedExts]
189

190
191
    allPaths = [p for p in allPaths if op.isfile(p)]
    nexists  = len(allPaths)
192
193
194

    # Could not find any supported file
    # with the specified prefix
195
196
    if nexists == 0:
        raise PathError('Could not find a supported file '
197
                        'with prefix "{}"'.format(prefix))
198

199
200
    # If ambiguity is ok, return
    # all matching paths
201
202
    if not unambiguous:

203
204
205
206
207
        return allPaths

    # Ambiguity is not ok! More than
    # one supported file with the
    # specified prefix.
208
209
210
211
    elif nexists > 1:

        # Remove non-existent paths from the
        # extended list, get all their
212
213
214
215
216
217
218
219
220
221
        # suffixes, and see if they match
        # any file groups.
        suffixes     = [getExt(p, allowedExts) for p in allPaths]
        groupMatches = [sorted(suffixes) == sorted(g) for g in fileGroups]

        # Is there a match for a file suffix group?
        # If not, multiple files with the specified
        # prefix exist, and there is no way to
        # resolve the ambiguity.
        if sum(groupMatches) != 1:
222
            raise PathError('More than one file with '
223
224
                            'prefix "{}"'.format(prefix))

225
226
        # Otherwise, we return a path
        # to the file which matches the
227
228
229
        # first suffix in the group.
        groupIdx = groupMatches.index(True)
        allPaths = [prefix + fileGroups[groupIdx][0]]
230
231
232

    # Return the full file name of the
    # supported file that was found
233
    return allPaths[0]
234
235


236
237
238
239
240
def removeExt(
        filename    : PathLike,
        allowedExts : Sequence[str] = None,
        firstDot    : bool          = False
) -> str:
241
    """Returns the base name of the given file name.  See :func:`splitExt`. """
242
    return splitExt(filename, allowedExts, firstDot)[0]
243
244


245
246
247
248
249
def getExt(
        filename    : PathLike,
        allowedExts : Sequence[str] = None,
        firstDot    : bool          = False
) -> str:
250
    """Returns the extension of the given file name.  See :func:`splitExt`. """
251
    return splitExt(filename, allowedExts, firstDot)[1]
252
253


254
255
256
257
258
def splitExt(
        filename    : PathLike,
        allowedExts : Sequence[str] = None,
        firstDot    : bool          = False
) -> Tuple[str, str]:
259
    """Returns the base name and the extension from the given file name.
260

261
262
    If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this
    function is equivalent to using::
263

264
        os.path.splitext(filename)
265

266
267
268
269
270
271
272
273
274
275
    If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file
    name is split on the first period that is found, rather than the last
    period. For example::

        splitExt('image.nii.gz')                # -> ('image.nii', '.gz')
        splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz')

    If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if
    the file does not end with an allowed extension, a tuple containing
    ``(filename, '')`` is returned.
276

277
    :arg filename:    The file name to split.
278

279
    :arg allowedExts: Allowed/recognised file extensions.
280
281
282

    :arg firstDot:    Split the file name on the first period, rather than the
                      last period. Ignored if ``allowedExts`` is specified.
283
284
    """

285
286
    filename = str(filename)

287
288
    # If allowedExts is not specified
    # we split on a period character
289
    if allowedExts is None:
290
291
292
293
294
295
296
297
298
299
300
301
302

        # split on last period - equivalent
        # to op.splitext
        if not firstDot:
            return op.splitext(filename)

        # split on first period
        else:
            idx = filename.find('.')
            if idx == -1:
                return filename, ''
            else:
                return filename[:idx], filename[idx:]
303
304
305
306

    # Otherwise, try and find a suffix match
    extMatches = [filename.endswith(ext) for ext in allowedExts]

307
    # No match, assume there is no extension
308
    if not any(extMatches):
309
        return filename, ''
310

311
    # Otherwise split the filename
312
    # into its base and its extension
313
    extIdx = extMatches.index(True)
314
315
316
317
318
    extLen = len(allowedExts[extIdx])

    return filename[:-extLen], filename[-extLen:]


319
320
321
322
323
def getFileGroup(path,
                 allowedExts=None,
                 fileGroups=None,
                 fullPaths=True,
                 unambiguous=False):
324
    """If the given ``path`` is part of a ``fileGroup``, returns a list
325
326
327
    containing the paths to all other files in the group (including the
    ``path`` itself).

328
329
330
331
    If the ``path`` does not appear to be part of a file group, or appears to
    be part of an incomplete file group, a list containing only the ``path``
    is returned.

332
    If the ``path`` does not exist, or appears to be part of more than one
333
    file group, a :exc:`PathError` is raised.
334
335
336
337
338
339

    File groups can be used to specify a collection of file suffixes which
    should always exist alongside each other. This can be used to resolve
    ambiguity when multiple files exist with the same ``prefix`` and supported
    extensions (e.g. ``file.hdr`` and ``file.img``). The file groups are
    specified as a list of sequences, for example::
340

341
342
        [('.img',    '.hdr'),
         ('.img.gz', '.hdr.gz')]
343

344
    If you specify ``fileGroups=[('.img', '.hdr')]`` and ``prefix='file'``, and
345
346
347
348
    both ``file.img`` and ``file.hdr`` exist, the :func:`addExt` function would
    return ``file.img`` (i.e. the file which matches the first extension in
    the group).

349
350
351
    Similarly, if you call the :func:`.imcp.imcp` or :func:`.imcp.immv`
    functions with the above parameters, both ``file.img`` and ``file.hdr``
    will be moved.
352
353
354
355

    .. note:: The primary use-case of file groups is to resolve ambiguity with
              respect to NIFTI and ANALYSE75 image pairs. By specifying
              ``fileGroups=[('.img', '.hdr'), ('.img.gz', '.hdr.gz')]``, the
356
357
358
359
              :func:`addExt`, :func:`.imcp.immv` and :func:`.imcp.imcp`
              functions are able to figure out what you mean when you specify
              ``file``, and both ``file.hdr`` and ``file.img`` (or
              ``file.hdr.gz`` and ``file.img.gz``) exist.
360

361
    :arg path:        Path to the file. Must contain the file extension.
362

363
    :arg allowedExts: Allowed/recognised file extensions.
364

365
    :arg fileGroups:  Recognised file groups.
366

367
368
369
    :arg fullPaths:   If ``True`` (the default), full file paths (relative to
                      the ``path``) are returned. Otherwise, only the file
                      extensions in the group are returned.
370

371
    :arg unambiguous: Defaults to ``False``. If ``True``, and the path
372
                      is not unambiguously part of one group, or part of
373
374
                      no groups, a :exc:`PathError` is raised.
                      Otherwise, the path is returned.
375
376
    """

377
    path = addExt(path, allowedExts, mustExist=True, fileGroups=fileGroups)
378
    base, ext = splitExt(path, allowedExts)
379

380
381
382
    if fileGroups is None:
        if fullPaths: return [path]
        else:         return [ext]
383
384
385

    matchedGroups     = []
    matchedGroupFiles = []
386
387
    fullMatches       = 0
    partialMatches    = 0
388
389
390

    for group in fileGroups:

391
        if ext != '' and ext not in group:
392
393
394
            continue

        groupFiles = [base + s for s in group]
395
        exist      = [op.exists(f) for f in groupFiles]
396

397
398
        if any(exist):
            partialMatches += 1
399

400
401
402
403
        if all(exist):
            fullMatches += 1
            matchedGroups    .append(group)
            matchedGroupFiles.append(groupFiles)
404

405
    # Path is not part of any group
406
    if partialMatches == 0:
407
408
        if fullPaths: return [path]
        else:         return [ext]
409

410
411
    # If the given path is part of more
    # than one existing file group, we
412
    # can't resolve this ambiguity.
413
    if fullMatches > 1:
414
415
416
        raise PathError('Path is part of multiple '
                        'file groups: {}'.format(path))

417
418
419
420
421
422
423
424
    # If the unambiguous flag is not set,
    # we don't care about partial matches
    if not unambiguous:
        partialMatches = 0

    # The path is unambiguously part of a
    # complete file group - resolve it to
    # the first element of the group
Paul McCarthy's avatar
Paul McCarthy committed
425
    if fullMatches == 1 and partialMatches <= 1:
426
427
428
429
430
        if fullPaths: return matchedGroupFiles[0]
        else:         return matchedGroups[    0]

    # The path appears to be part of
    # an incomplete group - this is
Paul McCarthy's avatar
Paul McCarthy committed
431
    # potentially ambiguous, so give
432
433
434
435
436
    # up (but see the partialMatches
    # clobber above).
    elif partialMatches > 0:
        raise PathError('Path is part of an incomplete '
                        'file group: {}'.format(path))
437

438
439
440
441
    else:
        if fullPaths: return [path]
        else:         return [ext]

442
443

def removeDuplicates(paths, allowedExts=None, fileGroups=None):
444
    """Reduces the list of ``paths`` down to those which are unique with
445
446
447
    respect to the specified ``fileGroups``.

    For example, if you have a directory containing::
448

449
450
451
452
453
454
455
456
457
458
459
460
        001.hdr
        001.img
        002.hdr
        002.img
        003.hdr
        003.img

    And you call ``removeDuplicates`` like so::

         paths       = ['001.img', '001.hdr',
                        '002.img', '002.hdr',
                        '003.img', '003.hdr']
461

462
463
464
465
466
467
468
469
470
471
472
473
474
475
         allowedExts = ['.img',  '.hdr']
         fileGroups  = [('.img', '.hdr')]

         removeDuplicates(paths, allowedExts, fileGroups)

    The returned list will be::

         ['001.img', '002.img', '003.img']

    If you provide ``allowedExts``, you may specify incomplete ``paths`` (i.e.
    without extensions), as long as there are no path ambiguities.

    A :exc:`PathError` will be raised if any of the ``paths`` do not exist,
    or if there are any ambiguities with respect to incomplete paths.
476
477

    :arg paths:       List of paths to reduce.
478
479
480
481
482
483
484
485
486
487
488
489

    :arg allowedExts: Allowed/recognised file extensions.

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
    """

    unique = []

    for path in paths:

        groupFiles = getFileGroup(path, allowedExts, fileGroups)

490
        if not any(p in unique for p in groupFiles):
491
492
493
            unique.append(groupFiles[0])

    return unique
494
495
496
497
498
499


def uniquePrefix(path):
    """Return the longest prefix for the given file name which unambiguously
    identifies it, relative to the other files in the same directory.

500
    Raises a :exc:`PathError` if a unique prefix could not be found (which
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
    will never happen if the path is valid).
    """

    dirname, filename = op.split(path)

    idx    = 0
    prefix = op.join(dirname, filename[0])
    hits   = glob.glob('{}*'.format(prefix))

    while True:

        # Found a unique prefix
        if len(hits) == 1:
            break

        # Should never happen if path is valid
517
        if len(hits) == 0 or idx >= len(filename) - 1:
518
            raise PathError('No unique prefix for {}'.format(filename))
519
520

        # Not unique - continue looping
521
522
523
        idx    += 1
        prefix  = prefix + filename[idx]
        hits    = [h for h in hits if h.startswith(prefix)]
524
525

    return prefix
526
527
528
529
530
531
532
533
534
535
536
537


def commonBase(paths):
    """Identifies the deepest common base directory shared by all files
    in ``paths``.

    Raises a :exc:`PathError` if the paths have no common base. This will
    never happen for absolute paths (as the base will be e.g. ``'/'``).
    """

    depths = [len(p.split(op.sep)) for p in paths]
    base   = max(zip(depths, paths), key=operator.itemgetter(0))[1]
538
    last   = base
539
540
541
542
543

    while True:

        base = op.split(base)[0]

544
        if base == last or len(base) == 0:
545
546
            break

547
548
        last = base

549
        if all(p.startswith(base) for p in paths):
550
551
552
            return base

    raise PathError('No common base')
553

554

555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
def wslpath(path):
    """Convert Windows path (or a command line argument containing a Windows
    path) to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``).
    Also supports paths in the form ``\\wsl$\\(distro)\\users\\...``

    :param winpath: Command line argument which may (or may not) contain a
                    Windows path. It is assumed to be either of the form
                    <windows path> or --<arg>=<windows path>. Note that we
                    don't need to handle --arg <windows path> or -a <windows
                    path> since in these cases the argument and the path will
                    be parsed as separate entities.
    :return:        If ``winpath`` matches a Windows path, the converted
                    argument (including the --<arg>= portion).  Otherwise
                    returns ``winpath`` unchanged.

570
    """
571
    match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", path)
572
573
574
575
576
577
    if match:
        arg, path = match.group(1, 2)
        if arg is None:
            arg = ""
        return arg + path.replace("\\", "/")

578
    match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", path)
579
580
581
582
    if match:
        arg, drive, path = match.group(1, 2, 3)
        if arg is None:
            arg = ""
583
        return arg + "/mnt/" + drive.lower() + path.replace("\\", "/")
584

585
    return path
586

587

588
589
590
def winpath(path):
    """Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a
    path that can be used from Windows.
591
592
593

    If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified
    Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL.
594
    This requires WSL2 which supports the ``\\wsl$\\`` network path.
595
596
    wslpath is assumed to be an absolute path.
    """
597
    from fsl.utils.platform import platform  # pylint: disable=import-outside-toplevel  # noqa: E501
598
    if not platform.fslwsl:
599
        return path
600
601
602
603
604
605
606
607
    else:
        match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir)
        if match:
            distro = match.group(1)
        else:
            distro = None

        if not distro:
608
609
            raise RuntimeError('Could not identify WSL installation from '
                               'FSLDIR (%s)' % platform.fsldir)
610

611
        return "\\\\wsl$\\" + distro + path.replace("/", "\\")