__init__.py 5.05 KB
Newer Older
1
2
#!/usr/bin/env python
#
3
# __init__.py - Miscellaneous functions used throughout fsl_ci.
4
5
6
7
8
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#


9
10
11
12
13
14
15
16
17
18
import os.path      as op
import                 os
import                 sys
import                 errno
import                 shlex
import                 time
import                 tempfile
import urllib.parse as urlparse
import contextlib   as ctxlib
import subprocess   as sp
19

20
import yaml
21

22

23
__version__ = '0.17.0'
24
25
26
"""Current version of the fsl-ci-rules."""


27
28
29
30
31
32
33
34
USERNAME = 'fsl-ci-rules'
"""Username to be used for all git interactions which require one. """


EMAIL = 'fsl-ci-rules@git.fmrib.ox.ac.uk'
"""Password to be used for all git interactions which require one. """


35
36
37
38
39
def fprint(*args, **kwargs):
    """Print with flush=True. """
    print(*args, **kwargs, flush=True)


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@ctxlib.contextmanager
def tempdir():
    """Context manager to create, and change into, a temporary directory, and
    then afterwards delete it and change back to the original working
    directory.
    """
    with tempfile.TemporaryDirectory() as td:
        prevdir = os.getcwd()
        os.chdir(td)
        try:
            yield td
        finally:
            os.chdir(prevdir)


@ctxlib.contextmanager
def indir(dirname):
    """Context manager to change into a directory, and then afterwards
    change back to the original working directory.
    """
    prevdir = os.getcwd()
    os.chdir(dirname)
    try:
        yield
    finally:
        os.chdir(prevdir)


def sprun(cmd, **kwargs):
    """Runs the given command with subprocess.run. """
70
    fprint(f'Running {cmd}')
71
72
    if not kwargs.get('shell', False):
        cmd = shlex.split(cmd)
73
74
75
76

    # we set stdout/stderr for compatibility
    # with CaptureStdout (see below)
    if 'check'  not in kwargs: kwargs['check']  = True
77
78
79
80

    if 'capture_output' not in kwargs:
        if 'stdout' not in kwargs: kwargs['stdout'] = sys.stdout
        if 'stderr' not in kwargs: kwargs['stderr'] = sys.stderr
81

82
    return sp.run(cmd, **kwargs)
83
84


85
86
87
88
89
90
91
92
93
94
def spcap(cmd, **kwargs):
    """Call sprun(..., capture_output=True), and return stdout as a string. """
    return sprun(cmd, capture_output=True, **kwargs).stdout.decode()


def which(command):
    """Run "which command". """
    return spcap(f'which {command}').strip()


95
96
97
98
99
100
101
102
103
104
105
106
107
@ctxlib.contextmanager
def lockdir(dirname):
    """Primitive mechanism by which concurrent access to a directory can be
    prevented. Attempts to create a semaphore file in the directory, but waits
    if that file already exists. Removes the file when finished.
    """

    delay    = 10
    lockfile = op.join(dirname, '.fsl_ci.lockdir')

    while True:
        try:
            fprint(f'Attempting to lock {dirname} for exclusive access.')
Paul McCarthy's avatar
Paul McCarthy committed
108
            fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
            break
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise e
            fprint(f'{dirname} is already locked - '
                   f'trying again in {delay} seconds ...')
        time.sleep(10)

    fprint(f'Exclusive access acquired for {dirname} ...')
    try:
        yield

    finally:
        fprint(f'Relinquishing lock on {dirname}')
        os.close( fd)
        os.unlink(lockfile)
125
126


127
128
def loadyaml(s):
    """Loads a YAML string, returning a dict-like. """
129
    return yaml.load(s, Loader=yaml.Loader)
130
131
132
133


def dumpyaml(o):
    """Dumps the given YAML to a string. """
134
    return yaml.dump(o, sort_keys=False)
135
136


137
138
139
140
141
142
143
144
def add_credentials(url, username, password):
    """Returns a URL with the username/password added."""
    username     = urlparse.quote_plus(username)
    password     = urlparse.quote_plus(password)
    scheme, path = url.split('://')
    return f'{scheme}://{username}:{password}@{path}'


145
146
147
148
class CaptureStdout:
    """Context manager which captures stdout and stderr. """

    def __init__(self):
149
150
151
        self.tmpdir        = tempfile.TemporaryDirectory()
        self.__mock_stdout = open(op.join(self.tmpdir.name, 'stdout'), 'wt')
        self.__mock_stderr = open(op.join(self.tmpdir.name, 'stderr'), 'wt')
152
153
154


    def __enter__(self):
155
156
157
        if self.tmpdir is None:
            raise RuntimeError('CaptureStdout can only be used once')

158
159
        self.__real_stdout = sys.stdout
        self.__real_stderr = sys.stderr
160
161
        sys.stdout         = self.__mock_stdout
        sys.stderr         = self.__mock_stderr
162

163
        return self
164
165

    def __exit__(self, *args, **kwargs):
166

167
168
169
        sys.stdout = self.__real_stdout
        sys.stderr = self.__real_stderr

170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
        self.__mock_stdout.close()
        self.__mock_stderr.close()

        with open(self.__mock_stdout.name, 'rt') as f:
            self.__mock_stdout = f.read()
        with open(self.__mock_stderr.name, 'rt') as f:
            self.__mock_stderr = f.read()

        self.tmpdir.cleanup()
        self.tmpdir = None

    @property
    def real_stdout(self):
        return self.__real_stdout

    @property
    def real_stderr(self):
        return self.__real_stderr

189
190
    @property
    def stdout(self):
191
        return self.__mock_stdout
192
193
194

    @property
    def stderr(self):
195
        return self.__mock_stderr