path.py 17.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
#
# path.py - Utility functions for working with file/directory paths.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains a few utility functions for working with file system
paths.


.. autosummary::
   :nosignatures:

   deepest
15
   shallowest
16
   allFiles
17
   hasExt
18
19
   addExt
   removeExt
20
   getExt
21
22
   splitExt
   getFileGroup
23
   removeDuplicates
24
   uniquePrefix
25
   commonBase
26
27
   wslpath
   winpath
28
29
30
31
"""


import os.path as op
32
33
import            os
import            glob
34
import            operator
35
import            re
36

37
from fsl.utils.platform import platform
38

39

40
class PathError(Exception):
41
42
43
    """``Exception`` class raised by the functions defined in this module
    when something goes wrong.
    """
44
45
46
    pass


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def deepest(path, suffixes):
    """Finds the deepest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
    """

    path = path.strip()

    if path == op.sep or path == '':
        return None

    path = path.rstrip(op.sep)

    if any([path.endswith(s) for s in suffixes]):
        return path

    return deepest(op.dirname(path), suffixes)


def shallowest(path, suffixes):
    """Finds the shallowest directory which ends with one of the given
    sequence of suffixes, or returns ``None`` if no directories end
    with any of the suffixes.
70
71
    """

72
73
74
    path = path.strip()

    # We've reached the root of the file system
Paul McCarthy's avatar
Paul McCarthy committed
75
    if path == op.sep or path == '' or op.splitdrive(path)[1] == '':
76
77
78
79
80
81
82
83
84
85
86
        return None

    path   = path.rstrip(op.sep)
    parent = shallowest(op.dirname(path), suffixes)

    if parent is not None:
        return parent

    if any([path.endswith(s) for s in suffixes]):
        return path

87
    return None
88
89


90
91
92
93
94
95
96
97
98
99
100
101
102
103
def allFiles(root):
    """Return a list containing all files which exist underneath the specified
    ``root`` directory.
    """

    files = []

    for dirpath, _, filenames in os.walk(root):
        filenames = [op.join(dirpath, f) for f in filenames]
        files.extend(filenames)

    return files


104
105
106
107
def hasExt(path, allowedExts):
    """Convenience function which returns ``True`` if the given ``path``
    ends with any of the given ``allowedExts``, ``False`` otherwise.
    """
108
    return any([path.endswith(e) for e in allowedExts])
109
110


111
def addExt(prefix,
112
           allowedExts=None,
113
114
           mustExist=True,
           defaultExt=None,
115
116
           fileGroups=None,
           unambiguous=True):
117
118
    """Adds a file extension to the given file ``prefix``.

119
    If ``mustExist`` is False, and the file does not already have a
120
121
122
123
    supported extension, the default extension is appended and the new
    file name returned. If the prefix already has a supported extension,
    it is returned unchanged.

124
125
    If ``mustExist`` is ``True`` (the default), the function checks to see
    if any files exist that have the given prefix, and a supported file
126
    extension.  A :exc:`PathError` is raised if:
127
128

       - No files exist with the given prefix and a supported extension.
129

130
131
       - ``fileGroups is None`` and ``unambiguous is True``, and more than
         one file exists with the given prefix, and a supported extension.
132
133
134

    Otherwise the full file name is returned.

135
136
    :arg prefix:      The file name prefix to modify.

137
    :arg allowedExts: List of allowed file extensions.
138

139
    :arg mustExist:   Whether the file must exist or not.
140

141
    :arg defaultExt:  Default file extension to use.
142
143

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
144
145
146
147
148

    :arg unambiguous: If ``True`` (the default), and more than one file
                      exists with the specified ``prefix``, a
                      :exc:`PathError` is raised. Otherwise, a list
                      containing *all* matching files is returned.
149
150
    """

151
152
153
154
155
    if allowedExts is None: allowedExts = []
    if fileGroups  is None: fileGroups  = {}

    if defaultExt is not None and defaultExt not in allowedExts:
        allowedExts.append(defaultExt)
156

157
158
159
    if not mustExist:

        # the provided file name already
Paul McCarthy's avatar
Paul McCarthy committed
160
        # ends with a supported extension
161
        if hasExt(prefix, allowedExts):
162
163
164
            return prefix

        if defaultExt is not None: return prefix + defaultExt
165
166
167
168
169
170
171
        else:                      return prefix

    # If no allowed extensions were
    # provided, or the provided prefix
    # already ends with a supported
    # extension, check to see that it
    # exists.
172
    if len(allowedExts) == 0 or hasExt(prefix, allowedExts):
173
        allPaths = [prefix]
174

175
176
177
178
    # Otherwise, make a bunch of file names, one per
    # supported extension, and test to see if exactly
    # one of them exists.
    else:
179
        allPaths = [prefix + ext for ext in allowedExts]
180

181
182
    allPaths = [p for p in allPaths if op.isfile(p)]
    nexists  = len(allPaths)
183
184
185

    # Could not find any supported file
    # with the specified prefix
186
187
    if nexists == 0:
        raise PathError('Could not find a supported file '
188
                        'with prefix "{}"'.format(prefix))
189

190
191
192
193
194
195
196
197
    # If ambiguity is ok, return
    # all matching paths
    elif not unambiguous:
        return allPaths

    # Ambiguity is not ok! More than
    # one supported file with the
    # specified prefix.
198
199
200
201
    elif nexists > 1:

        # Remove non-existent paths from the
        # extended list, get all their
202
203
204
205
206
207
208
209
210
211
        # suffixes, and see if they match
        # any file groups.
        suffixes     = [getExt(p, allowedExts) for p in allPaths]
        groupMatches = [sorted(suffixes) == sorted(g) for g in fileGroups]

        # Is there a match for a file suffix group?
        # If not, multiple files with the specified
        # prefix exist, and there is no way to
        # resolve the ambiguity.
        if sum(groupMatches) != 1:
212
            raise PathError('More than one file with '
213
214
                            'prefix "{}"'.format(prefix))

215
216
        # Otherwise, we return a path
        # to the file which matches the
217
218
219
        # first suffix in the group.
        groupIdx = groupMatches.index(True)
        allPaths = [prefix + fileGroups[groupIdx][0]]
220
221
222

    # Return the full file name of the
    # supported file that was found
223
    return allPaths[0]
224
225


226
def removeExt(filename, allowedExts=None, firstDot=False):
227
    """Returns the base name of the given file name.  See :func:`splitExt`. """
228

229
    return splitExt(filename, allowedExts, firstDot)[0]
230
231


232
def getExt(filename, allowedExts=None, firstDot=False):
233
    """Returns the extension of the given file name.  See :func:`splitExt`. """
234

235
    return splitExt(filename, allowedExts, firstDot)[1]
236
237


238
def splitExt(filename, allowedExts=None, firstDot=False):
239
    """Returns the base name and the extension from the given file name.
240

241
242
    If ``allowedExts`` is ``None`` and ``firstDot`` is ``False``, this
    function is equivalent to using::
243

244
        os.path.splitext(filename)
245

246
247
248
249
250
251
252
253
254
255
    If ``allowedExts`` is ``None`` and ``firstDot`` is ``True``, the file
    name is split on the first period that is found, rather than the last
    period. For example::

        splitExt('image.nii.gz')                # -> ('image.nii', '.gz')
        splitExt('image.nii.gz', firstDot=True) # -> ('image', '.nii.gz')

    If ``allowedExts`` is provided, ``firstDot`` is ignored. In this case, if
    the file does not end with an allowed extension, a tuple containing
    ``(filename, '')`` is returned.
256

257
    :arg filename:    The file name to split.
258

259
    :arg allowedExts: Allowed/recognised file extensions.
260
261
262

    :arg firstDot:    Split the file name on the first period, rather than the
                      last period. Ignored if ``allowedExts`` is specified.
263
264
    """

265
266
    # If allowedExts is not specified
    # we split on a period character
267
    if allowedExts is None:
268
269
270
271
272
273
274
275
276
277
278
279
280

        # split on last period - equivalent
        # to op.splitext
        if not firstDot:
            return op.splitext(filename)

        # split on first period
        else:
            idx = filename.find('.')
            if idx == -1:
                return filename, ''
            else:
                return filename[:idx], filename[idx:]
281
282
283
284

    # Otherwise, try and find a suffix match
    extMatches = [filename.endswith(ext) for ext in allowedExts]

285
    # No match, assume there is no extension
286
    if not any(extMatches):
287
        return filename, ''
288

289
    # Otherwise split the filename
290
    # into its base and its extension
291
    extIdx = extMatches.index(True)
292
293
294
295
296
    extLen = len(allowedExts[extIdx])

    return filename[:-extLen], filename[-extLen:]


297
298
299
300
301
def getFileGroup(path,
                 allowedExts=None,
                 fileGroups=None,
                 fullPaths=True,
                 unambiguous=False):
302
    """If the given ``path`` is part of a ``fileGroup``, returns a list
303
304
305
    containing the paths to all other files in the group (including the
    ``path`` itself).

306
307
308
309
    If the ``path`` does not appear to be part of a file group, or appears to
    be part of an incomplete file group, a list containing only the ``path``
    is returned.

310
    If the ``path`` does not exist, or appears to be part of more than one
311
    file group, a :exc:`PathError` is raised.
312
313
314
315
316
317

    File groups can be used to specify a collection of file suffixes which
    should always exist alongside each other. This can be used to resolve
    ambiguity when multiple files exist with the same ``prefix`` and supported
    extensions (e.g. ``file.hdr`` and ``file.img``). The file groups are
    specified as a list of sequences, for example::
318

319
320
        [('.img',    '.hdr'),
         ('.img.gz', '.hdr.gz')]
321

322
    If you specify ``fileGroups=[('.img', '.hdr')]`` and ``prefix='file'``, and
323
324
325
326
    both ``file.img`` and ``file.hdr`` exist, the :func:`addExt` function would
    return ``file.img`` (i.e. the file which matches the first extension in
    the group).

327
328
329
    Similarly, if you call the :func:`.imcp.imcp` or :func:`.imcp.immv`
    functions with the above parameters, both ``file.img`` and ``file.hdr``
    will be moved.
330
331
332
333

    .. note:: The primary use-case of file groups is to resolve ambiguity with
              respect to NIFTI and ANALYSE75 image pairs. By specifying
              ``fileGroups=[('.img', '.hdr'), ('.img.gz', '.hdr.gz')]``, the
334
335
336
337
              :func:`addExt`, :func:`.imcp.immv` and :func:`.imcp.imcp`
              functions are able to figure out what you mean when you specify
              ``file``, and both ``file.hdr`` and ``file.img`` (or
              ``file.hdr.gz`` and ``file.img.gz``) exist.
338

339
    :arg path:        Path to the file. Must contain the file extension.
340

341
    :arg allowedExts: Allowed/recognised file extensions.
342

343
    :arg fileGroups:  Recognised file groups.
344

345
346
347
    :arg fullPaths:   If ``True`` (the default), full file paths (relative to
                      the ``path``) are returned. Otherwise, only the file
                      extensions in the group are returned.
348

349
    :arg unambiguous: Defaults to ``False``. If ``True``, and the path
350
                      is not unambiguously part of one group, or part of
351
352
                      no groups, a :exc:`PathError` is raised.
                      Otherwise, the path is returned.
353
354
    """

355
    path = addExt(path, allowedExts, mustExist=True, fileGroups=fileGroups)
356
    base, ext = splitExt(path, allowedExts)
357

358
359
360
    if fileGroups is None:
        if fullPaths: return [path]
        else:         return [ext]
361
362
363

    matchedGroups     = []
    matchedGroupFiles = []
364
365
    fullMatches       = 0
    partialMatches    = 0
366
367
368

    for group in fileGroups:

369
        if ext != '' and ext not in group:
370
371
372
            continue

        groupFiles = [base + s for s in group]
373
        exist      = [op.exists(f) for f in groupFiles]
374

375
376
        if any(exist):
            partialMatches += 1
377

378
379
380
381
        if all(exist):
            fullMatches += 1
            matchedGroups    .append(group)
            matchedGroupFiles.append(groupFiles)
382

383
    # Path is not part of any group
384
    if partialMatches == 0:
385
386
        if fullPaths: return [path]
        else:         return [ext]
387

388
389
    # If the given path is part of more
    # than one existing file group, we
390
    # can't resolve this ambiguity.
391
    if fullMatches > 1:
392
393
394
        raise PathError('Path is part of multiple '
                        'file groups: {}'.format(path))

395
396
397
398
399
400
401
402
    # If the unambiguous flag is not set,
    # we don't care about partial matches
    if not unambiguous:
        partialMatches = 0

    # The path is unambiguously part of a
    # complete file group - resolve it to
    # the first element of the group
Paul McCarthy's avatar
Paul McCarthy committed
403
    if fullMatches == 1 and partialMatches <= 1:
404
405
406
407
408
        if fullPaths: return matchedGroupFiles[0]
        else:         return matchedGroups[    0]

    # The path appears to be part of
    # an incomplete group - this is
Paul McCarthy's avatar
Paul McCarthy committed
409
    # potentially ambiguous, so give
410
411
412
413
414
    # up (but see the partialMatches
    # clobber above).
    elif partialMatches > 0:
        raise PathError('Path is part of an incomplete '
                        'file group: {}'.format(path))
415

416
417
418
419
    else:
        if fullPaths: return [path]
        else:         return [ext]

420
421

def removeDuplicates(paths, allowedExts=None, fileGroups=None):
422
    """Reduces the list of ``paths`` down to those which are unique with
423
424
425
    respect to the specified ``fileGroups``.

    For example, if you have a directory containing::
426

427
428
429
430
431
432
433
434
435
436
437
438
        001.hdr
        001.img
        002.hdr
        002.img
        003.hdr
        003.img

    And you call ``removeDuplicates`` like so::

         paths       = ['001.img', '001.hdr',
                        '002.img', '002.hdr',
                        '003.img', '003.hdr']
439

440
441
442
443
444
445
446
447
448
449
450
451
452
453
         allowedExts = ['.img',  '.hdr']
         fileGroups  = [('.img', '.hdr')]

         removeDuplicates(paths, allowedExts, fileGroups)

    The returned list will be::

         ['001.img', '002.img', '003.img']

    If you provide ``allowedExts``, you may specify incomplete ``paths`` (i.e.
    without extensions), as long as there are no path ambiguities.

    A :exc:`PathError` will be raised if any of the ``paths`` do not exist,
    or if there are any ambiguities with respect to incomplete paths.
454
455

    :arg paths:       List of paths to reduce.
456
457
458
459
460
461
462
463
464
465
466
467

    :arg allowedExts: Allowed/recognised file extensions.

    :arg fileGroups:  Recognised file groups - see :func:`getFileGroup`.
    """

    unique = []

    for path in paths:

        groupFiles = getFileGroup(path, allowedExts, fileGroups)

468
        if not any([p in unique for p in groupFiles]):
469
470
471
            unique.append(groupFiles[0])

    return unique
472
473
474
475
476
477


def uniquePrefix(path):
    """Return the longest prefix for the given file name which unambiguously
    identifies it, relative to the other files in the same directory.

478
    Raises a :exc:`PathError` if a unique prefix could not be found (which
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
    will never happen if the path is valid).
    """

    dirname, filename = op.split(path)

    idx    = 0
    prefix = op.join(dirname, filename[0])
    hits   = glob.glob('{}*'.format(prefix))

    while True:

        # Found a unique prefix
        if len(hits) == 1:
            break

        # Should never happen if path is valid
        elif len(hits) == 0 or idx >= len(filename) - 1:
496
            raise PathError('No unique prefix for {}'.format(filename))
497
498
499
500
501
502
503
504

        # Not unique - continue looping
        else:
            idx    += 1
            prefix  = prefix + filename[idx]
            hits    = [h for h in hits if h.startswith(prefix)]

    return prefix
505
506
507
508
509
510
511
512
513
514
515
516


def commonBase(paths):
    """Identifies the deepest common base directory shared by all files
    in ``paths``.

    Raises a :exc:`PathError` if the paths have no common base. This will
    never happen for absolute paths (as the base will be e.g. ``'/'``).
    """

    depths = [len(p.split(op.sep)) for p in paths]
    base   = max(zip(depths, paths), key=operator.itemgetter(0))[1]
517
    last   = base
518
519
520
521
522

    while True:

        base = op.split(base)[0]

523
        if base == last or len(base) == 0:
524
525
            break

526
527
        last = base

528
529
530
531
        if all([p.startswith(base) for p in paths]):
            return base

    raise PathError('No common base')
532

533

534
535
def wslpath(winpath):
    """
536
    Convert Windows path (or a command line argument containing a Windows path)
537
538
539
    to the equivalent WSL path (e.g. ``c:\\Users`` -> ``/mnt/c/Users``). Also supports
    paths in the form ``\\wsl$\\(distro)\\users\\...``

540
541
542
543
544
    :param winpath: Command line argument which may (or may not) contain a Windows path. It is assumed to be
                    either of the form <windows path> or --<arg>=<windows path>. Note that we don't need to
                    handle --arg <windows path> or -a <windows path> since in these cases the argument
                    and the path will be parsed as separate entities.
    :return: If ``winpath`` matches a Windows path, the converted argument (including the --<arg>= portion).
545
546
547
548
549
550
551
552
553
554
555
556
557
558
                Otherwise returns ``winpath`` unchanged.
    """
    match = re.match(r"^(--[\w-]+=)?\\\\wsl\$[\\\/][^\\^\/]+(.*)$", winpath)
    if match:
        arg, path = match.group(1, 2)
        if arg is None:
            arg = ""
        return arg + path.replace("\\", "/")

    match = re.match(r"^(--[\w-]+=)?([a-zA-z]):(.+)$", winpath)
    if match:
        arg, drive, path = match.group(1, 2, 3)
        if arg is None:
            arg = ""
559
        return arg + "/mnt/" + drive.lower() + path.replace("\\", "/")
560
561
562

    return winpath

563

564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def winpath(wslpath):
    """
    Convert a WSL-local filepath (for example ``/usr/local/fsl/``) into a path that can be used from
    Windows.

    If ``self.fslwsl`` is ``False``, simply returns ``wslpath`` unmodified
    Otherwise, uses ``FSLDIR`` to deduce the WSL distro in use for FSL.
    This requires WSL2 which supports the ``\\wsl$\`` network path.
    wslpath is assumed to be an absolute path.
    """
    if not platform.fslwsl:
        return wslpath
    else:
        match = re.match(r"^\\\\wsl\$\\([^\\]+).*$", platform.fsldir)
        if match:
            distro = match.group(1)
        else:
            distro = None

        if not distro:
            raise RuntimeError("Could not identify WSL installation from FSLDIR (%s)" % platform.fsldir)

        return "\\\\wsl$\\" + distro + wslpath.replace("/", "\\")