Commit f6cbd9a6 authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

RF: Cosmetic re-arrangement. Make identify_platform, download_manifest,

check_need_admin, get_admin_password stand-alone functions
parent ee002119
......@@ -81,7 +81,7 @@ FSL_RELEASE_MANIFEST = 'https://fsl.fmrib.ox.ac.uk/fsldownloads/' \
manifest file is a JSON file which contains information about available FSL
versions.
See the Context.download_manifest function, and an example manifest file
See the download_manifest function, and an example manifest file
in test/data/manifest.json, for more details.
A custom manifest URL can be specified with the -a/--manifest command-line
......@@ -96,502 +96,334 @@ internal/development FSL releases.
"""
@ft.total_ordering
class Version(object):
"""Class to represent and compare version strings. Accepted version
strings are of the form W.X.Y.Z, where W, X, Y, and Z are all integers.
"""
def __init__(self, verstr):
# Version identifiers for official FSL
# releases will have up to four
# components (X.Y.Z.W), but We accept
# any number of (integer) components,
# as internal releases may have more.
components = []
for comp in verstr.split('.'):
try: components.append(int(comp))
except Exception: break
self.components = components
self.verstr = verstr
# List of modifiers which can be used to change how
# a message is printed by the printmsg function.
INFO = 1
IMPORTANT = 2
QUESTION = 3
PROMPT = 4
WARNING = 5
ERROR = 6
EMPHASIS = 7
UNDERLINE = 8
RESET = 9
ANSICODES = {
INFO : '\033[37m', # Light grey
IMPORTANT : '\033[92m', # Green
QUESTION : '\033[36m\033[4m', # Blue+underline
PROMPT : '\033[36m\033[1m', # Bright blue+bold
WARNING : '\033[93m', # Yellow
ERROR : '\033[91m', # Red
EMPHASIS : '\033[1m', # White+bold
UNDERLINE : '\033[4m', # Underline
RESET : '\033[0m', # Used internally
}
def __str__(self):
return self.verstr
def __eq__(self, other):
for sn, on in zip(self.components, other.components):
if sn != on:
return False
return len(self.components) == len(other.components)
def printmsg(msg='', *msgtypes, **kwargs):
"""Prints msg according to the ANSI codes provided in msgtypes.
All other keyword arguments are passed through to the print function.
def __lt__(self, other):
for p1, p2 in zip(self.components, other.components):
if p1 < p2: return True
if p1 > p2: return False
return len(self.components) < len(other.components)
:arg msgtypes: Message types to control formatting
:arg log: If True (default), the message is logged.
All other keyword arguments are passed to the built-in print function.
"""
logmsg = kwargs.pop('log', msg != '')
msgcodes = [ANSICODES[t] for t in msgtypes]
msgcodes = ''.join(msgcodes)
if logmsg:
log.debug(msg)
print('{}{}{}'.format(msgcodes, msg, ANSICODES[RESET]), **kwargs)
sys.stdout.flush()
class Context(object):
"""Bag of information and settings created in the main function, and passed
around this script.
Several settings are lazily evaluated on first access, but once evaluated,
their values are immutable.
def prompt(promptmsg, *msgtypes, **kwargs):
"""Prompts the user for some input. msgtypes and kwargs are passed
through to the printmsg function.
"""
printmsg(promptmsg, *msgtypes, end='', log=False, **kwargs)
def __init__(self, args):
"""Create the context with the argparse.Namespace object containing
parsed command-line arguments.
"""
if PYVER[0] == 2: response = raw_input(' ').strip()
else: response = input( ' ').strip()
self.args = args
self.shell = op.basename(os.environ.get('SHELL', 'sh')).lower()
log.debug('%s: %s', promptmsg, response)
# These attributes are updated on-demand via
# the property accessors defined below, or all
# all updated via the finalise-settings method.
self.__platform = None
self.__manifest = None
self.__devmanifest = None
self.__build = None
self.__destdir = None
self.__need_admin = None
self.__admin_password = None
return response
# If the destination directory already exists,
# and the user chooses to overwrite it, it is
# moved so that, if the installation fails, it
# can be restored. The new path is stored
# here - refer to overwrite_destdir.
self.old_destdir = None
# The download_fsl_environment function stores
# the path to the FSL conda environment file,
# list of conda channels, and versions of a
# small set of "base" packages here.
self.environment_file = None
self.environment_channels = None
self.fsl_base_packages = None
def identify_platform():
"""Figures out what platform we are running on. Returns a platform
identifier string - one of:
# The config_logging function stores the path
# to the fslinstaller log file here.
self.logfile = None
- "linux-64" (Linux, x86_64)
- "macos-64" (macOS, x86_64)
"""
platforms = {
('linux', 'x86_64') : 'linux-64',
('darwin', 'x86_64') : 'macos-64',
def finalise_settings(self):
"""Finalise values for all information and settings in the Context.
"""
self.manifest
self.platform
self.build
self.destdir
self.need_admin
self.admin_password
# M1 builds (and possbily ARM for Linux)
# will be added in the future
('darwin', 'arm64') : 'macos-64',
}
system = platform.system().lower()
cpu = platform.machine()
key = (system, cpu)
@property
def platform(self):
"""The platform we are running on, e.g. "linux-64", "macos-64". """
if self.__platform is None:
self.__platform = Context.identify_platform()
return self.__platform
if key not in platforms:
supported = ', '.join(['[{}, {}]' for s, c in platforms])
raise Exception('This platform [{}, {}] is unrecognised or '
'unsupported! Supported platforms: {}'.format(
system, cpu, supported))
return platforms[key]
@property
def build(self):
"""Returns a suitable FSL build (a dictionary entry from the FSL
installer manifest) for the target platform.
The returned dictionary has the following elements:
- 'version' FSL version.
- 'platform': Platform identifier (e.g. 'linux-64')
- 'environment': Environment file to download
- 'sha256': Checksum of environment file
- 'output': Number of lines of expected output, for reporting
progress
"""
def check_need_admin(dirname):
"""Returns True if dirname needs administrator privileges to write to,
False otherwise.
"""
# os.supports_effective_ids added in
# python 3.3, so can't be used here
return not os.access(dirname, os.W_OK | os.X_OK)
if self.__build is not None:
return self.__build
# defaults to "latest" if
# not specified by the user
fslversion = self.args.fslversion
def get_admin_password():
"""Prompt the user for their administrator password."""
if fslversion not in self.manifest['versions']:
available = ', '.join(self.manifest['versions'].keys())
raise Exception(
'FSL version "{}" is not available - available '
'versions: {}'.format(fslversion, available))
def validate_admin_password(password):
proc = Process.sudo_popen(['true'], password, stdin=sp.PIPE)
proc.communicate()
return proc.returncode == 0
if fslversion == 'latest':
fslversion = self.manifest['versions']['latest']
for attempt in range(3):
if attempt == 0:
msg = 'Your administrator password is needed to ' \
'install FSL: '
else:
msg = 'Your administrator password is needed to ' \
'install FSL [attempt {} of 3]:'.format(attempt + 1)
printmsg(msg, IMPORTANT, end='')
password = getpass.getpass('')
valid = validate_admin_password(password)
if valid:
printmsg('Password accepted', INFO)
break
else:
printmsg('Incorrect password', WARNING)
# Find refs to a suitable build for this
# platform. We assume that there is only
# one default build for each platform.
# in the list of builds for a given FSL
# version.
build = None
if not valid:
raise Exception('Incorrect password')
for candidate in self.manifest['versions'][fslversion]:
if candidate['platform'] == self.platform:
build = candidate
break
return password
if build is None:
raise Exception(
'Cannot find a version of FSL matching '
'platform {}'.format(self.platform))
printmsg('FSL {} selected for installation'.format(build['version']))
def isstr(s):
"""Returns True if s is a string, False otherwise, Works on python 2.7
and >=3.3.
"""
try: return isinstance(s, basestring)
except Exception: return isinstance(s, str)
self.__build = build
return build
def match_any(s, patterns):
"""Test if the string s matches any of the fnmatch-style patterns.
Returns the matched pattern, or None.
"""
for pat in patterns:
if fnmatch.fnmatch(s, pat):
return pat
return None
@property
def destdir(self):
"""Installation directory. If not specified at the command line, the
user is prompted to enter a directory.
"""
if self.__destdir is not None:
return self.__destdir
@contextlib.contextmanager
def tempdir(override_dir=None):
"""Returns a context manager which creates, changes into, and returns a
temporary directory, and then deletes it on exit.
# The loop below validates the destination directory
# both when specified at commmand line or
# interactively. In either case, if invalid, the
# user is re-prompted to enter a new destination.
destdir = None
if self.args.dest is not None: response = self.args.dest
else: response = None
If override_dir is not None, instead of creating and changing into a
temporary directory, this function just changes into override_dir.
"""
while destdir is None:
if override_dir is None: tmpdir = tempfile.mkdtemp()
else: tmpdir = override_dir
if response is None:
printmsg('\nWhere do you want to install FSL?',
IMPORTANT, EMPHASIS)
printmsg('Press enter to install to the default location '
'[{}]\n'.format(DEFAULT_INSTALLATION_DIRECTORY), INFO)
response = prompt('FSL installation directory [{}]:'.format(
DEFAULT_INSTALLATION_DIRECTORY), QUESTION, EMPHASIS)
response = response.rstrip(op.sep)
prevdir = os.getcwd()
if response == '':
response = DEFAULT_INSTALLATION_DIRECTORY
try:
os.chdir(tmpdir)
yield tmpdir
response = op.expanduser(op.expandvars(response))
response = op.abspath(response)
parentdir = op.dirname(response)
if op.exists(parentdir):
destdir = response
else:
printmsg('Destination directory {} does not '
'exist!'.format(parentdir), ERROR)
response = None
finally:
os.chdir(prevdir)
if override_dir is None:
shutil.rmtree(tmpdir)
self.__destdir = destdir
return self.__destdir
@contextlib.contextmanager
def tempfilename(permissions=None, delete=True):
"""Returns a context manager which creates a temporary file, yields its
name, then deletes the file on exit.
"""
@property
def need_admin(self):
"""Returns True if administrator privileges will be needed to install
FSL.
"""
if self.__need_admin is not None:
return self.__need_admin
parentdir = op.dirname(self.destdir)
self.__need_admin = Context.check_need_admin(parentdir)
return self.__need_admin
fname = None
try:
tmpf = tempfile.NamedTemporaryFile(delete=False)
fname = tmpf.name
@property
def admin_password(self):
"""Returns the user's administrator password, prompting them if needed.
"""
if self.__admin_password is not None:
return self.__admin_password
if self.__need_admin == False:
return None
self.__admin_password = Context.get_admin_password()
@property
def manifest(self):
"""Returns the FSL installer manifest as a dictionary. """
if self.__manifest is None:
if self.devmanifest is not None:
self.args.manifest = self.devmanifest
self.__manifest = Context.download_manifest(self.args.manifest,
self.args.workdir)
return self.__manifest
@property
def devmanifest(self):
"""Returns a URL to a development manifest to use for installation.
This will only return a value if the --devrelease or --devlatest
options are active.
If this is the case, the FSL_DEV_RELEASES file is downloaded - this
file contains a list of available development manifest URLS. The
user is then prompted to choose which development manifest to use
for the installation, unless --devlatest is active, in which case
the newest manifest is selected.
"""
if not self.args.devrelease:
return None
if self.__devmanifest == 'na':
return None
elif self.__devmanifest is not None:
return self.__devmanifest
# parse a dev manifest file name, returning
# a sequence containing the tage, date, commit
# hash, and branch name. Dev manifest files
# are named like so:
#
# manifest-<tag>.<date>.<commit>.<branch>.json
#
# where <tag> is the tag of the most recent
# public FSL release, and everything else is
# self-explanatory.
def parse_devrelease_name(url):
name = urlparse.urlparse(url).path
name = op.basename(name)
name = name.lstrip('manifest-').rstrip('.json')
# Awkward - the tag may have periods in it
name = name.rsplit('.', 3)
return name
# list of (url, tag, date, commit, branch),
# sorted by date
devreleases = []
with tempdir(self.args.workdir):
try:
download_file(FSL_DEV_RELEASES, 'devreleases.txt')
except Exception as e:
log.debug('Error downloading devreleases.txt from %s',
FSL_DEV_RELEASES, exc_info=True)
raise Exception('Unable to download development manifest '
'list from {}!'.format(FSL_DEV_RELEASES))
with open('devreleases.txt', 'rt') as f:
urls = f.read().strip().split('\n')
urls = [l.strip() for l in urls]
for url in urls:
devreleases.append([url] + parse_devrelease_name(url))
devreleases = sorted(devreleases, key=lambda r: r[2], reverse=True)
if len(devreleases) == 0:
self.__devmanifest = 'na'
return None
# automatically choose latest dev manifest?
if self.args.devlatest:
devrelease = devreleases[0][0]
# show the user a list, ask them which one they want
else:
printmsg('Available development releases:', EMPHASIS)
for i, (url, tag, date, commit, branch) in enumerate(devreleases):
printmsg(' [{}]: {} [{} commit {}]'.format(
i + 1, date, branch, commit), IMPORTANT)
while True:
selection = prompt('Which release would you like to '
'install? [1]:', PROMPT)
if selection == '':
selection = '1'
try:
selection = int(selection) - 1
except Exception:
continue
if selection >= 0 and selection < len(devreleases):
break
devrelease = devreleases[selection][0]
tmpf.close()
self.__devmanifest = devrelease
return self.__devmanifest
if permissions:
os.chmod(fname, permissions)
yield fname
@staticmethod
def identify_platform():
"""Figures out what platform we are running on. Returns a platform
identifier string - one of:
finally:
if delete and fname and op.exists(fname):
os.remove(fname)
- "linux-64" (Linux, x86_64)
- "macos-64" (macOS, x86_64)
"""
platforms = {
('linux', 'x86_64') : 'linux-64',
('darwin', 'x86_64') : 'macos-64',
def sha256(filename, check_against=None, blocksize=1048576):
"""Calculate the SHA256 checksum of the given file. If check_against
is provided, it is compared against the calculated checksum, and an
error is raised if they are not the same.
"""
# M1 builds (and possbily ARM for Linux)
# will be added in the future
('darwin', 'arm64') : 'macos-64',
}
hashobj = hashlib.sha256()
system = platform.system().lower()
cpu = platform.machine()
key = (system, cpu)
with open(filename, 'rb') as f:
while True:
block = f.read(blocksize)
if len(block) == 0:
break
hashobj.update(block)
if key not in platforms:
supported = ', '.join(['[{}, {}]' for s, c in platforms])
raise Exception('This platform [{}, {}] is unrecognised or '
'unsupported! Supported platforms: {}'.format(
system, cpu, supported))
checksum = hashobj.hexdigest()
return platforms[key]
if check_against is not None:
if checksum != check_against:
raise Exception('File {} does not match expected checksum '
'({})'.format(filename, check_against))
return checksum
@staticmethod
def check_need_admin(dirname):
"""Returns True if dirname needs administrator privileges to write to,
False otherwise.
"""
# os.supports_effective_ids added in
# python 3.3, so can't be used here
return not os.access(dirname, os.W_OK | os.X_OK)
def clean_environ():
"""Return a dict containing a set of sanitised environment variables.
@staticmethod
def get_admin_password():
"""Prompt the user for their administrator password."""
def validate_admin_password(password):
proc = Process.sudo_popen(['true'], password, stdin=sp.PIPE)
proc.communicate()
return proc.returncode == 0
for attempt in range(3):
if attempt == 0:
msg = 'Your administrator password is needed to ' \
'install FSL: '
else:
msg = 'Your administrator password is needed to ' \
'install FSL [attempt {} of 3]:'.format(attempt + 1)
printmsg(msg, IMPORTANT, end='')
password = getpass.getpass('')
valid = validate_admin_password(password)
if valid:
printmsg('Password accepted', INFO)
break
else:
printmsg('Incorrect password', WARNING)
All FSL and conda related variables are removed.
"""
env = os.environ.copy()
for v in list(env.keys()):
if any(('FSL' in v, 'CONDA' in v)):
env.pop(v)
return env
if not valid:
raise Exception('Incorrect password')
return password
def download_file(url,
destination,
progress=None,
blocksize=131072,
ssl_verify=True):
"""Download a file from url, saving it to destination. """
def default_progress(downloaded, total):
pass
@staticmethod
def download_manifest(url, workdir=None):
"""Downloads the installer manifest file, which contains information
about available FSL versions, and the most recent version number of the
installer (this script).
if progress is None:
progress = default_progress
The manifest file is a JSON file. Lines beginning
with a double-forward-slash are ignored. See test/data/manifes.json
for an example.
log.debug('Downloading %s ...', url)
This function modifies the manifest structure by adding a 'version'
attribute to all FSL build entries.
"""
# Path to local file
if op.exists(url):
url = 'file:' + urlrequest.pathname2url(op.abspath(url))
log.debug('Downloading FSL installer manifest from %s', url)
# We create and use an unconfigured SSL
# context to disable SSL verification.
# Otherwise pass None causes urlopen to
# use default behaviour. The context
# argument is not available in py3.3
kwargs = {}
if (not ssl_verify) and (PYVER != (3, 3)):
printmsg('Skipping SSL verification - this '
'is not recommended!', WARNING)
kwargs['context'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
with tempdir(workdir):
req = None
try: