Commit 4ecdcb7e authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

RF: Clean-up, add sha256 check

parent 48cf5ac1
......@@ -15,6 +15,7 @@ import sys
import argparse
import contextlib
import getpass
import hashlib
import json
import logging
import platform
......@@ -31,6 +32,209 @@ import urllib.request as urlrequest
log = logging.getLogger(__name__)
__version__ = '0.0.0'
"""Installer version number. This is automatically updated in release versions
whenever a new version is released.
"""
DEFAULT_INSTALLATION_DIRECTORY = '/usr/local/fsl'
"""Default FSL installation directory. """
FSL_INSTALLER_MANIFEST = 'http://18.133.213.73/installer/manifest,json'
"""URL to download the FSL installer manifest file from. The installer
manifest file is a JSON file which contains information about available
FSL versions. See the download_installer_manifest function for more
details.
"""
class InvalidPassword(Exception):
"""Exception raised by Context.get_admin_password if the user gives an
incorrect password.
"""
class Context(object):
"""Bag of information and settings created in main, and passed around
this script.
"""
def __init__(self, args):
self.args = args
self.platform = Context.identify_platform()
self.cuda = Context.identify_cuda()
# These attributes are updated on-demand via
# the property accessors defined below.
self.__destdir = args.dest
self.__need_admin = None
self.__admin_password = None
self.__manifest = None
@property
def destdir(self):
if self.__destdir is None:
printmsg('Where do you want to install FSL?', IMPORTANT, EMPHASIS)
printmsg('Press enter to install to the default '
'location [{}]'.format(DEFAULT_INSTALLATION_DIRECTORY),
INFO)
self.__destdir = prompt('FSL installation directory:', QUESTION)
return self.__destdir
@property
def need_admin(self):
if self.__need_admin is not None:
return self.__need_admin
if self.__destdir is None:
raise RuntimeError('Destination directory has not been set')
self.__need_admin = Context.check_need_admin(self.destdir)
@property
def admin_password(self):
if self.__admin_password is not None:
return self.__admin_password
if self.__need_admin == False:
return None
if self.__destdir is None:
raise RuntimeError('Destination directory has not been set')
self.__admin_password = Context.get_admin_password()
@property
def manifest(self):
if self.__manifest is None:
self.__manifest = Context.download_manifest(self.args.manifest)
return self.__manifest
@staticmethod
def identify_platform():
"""Figures out what platform we are running on. Returns a platform
identifier string - one of:
- "linux-64" (Linux, x86_64)
- "macos-64" (macOS, x86_64)
"""
platforms = {
('linux', 'x86_64') : 'linux-64',
('darwin', 'x86_64') : 'linux-64',
# ARM builds will be added in the future
('darwin', 'arm64') : 'macos-64',
}
system = platform.system().lower()
cpu = platform.machine()
key = (system, cpu)
if key not in platforms:
raise UnsupportedPlatform()
return platforms[key]
@staticmethod
def identify_cuda():
"""Identifies the CUDA version supported on the platform. Returns a
string containing the 'X.Y' CUDA version, or None if CUDA is not supported.
"""
try:
output = sp.check_output('nvidia-smi')
except (sp.CalledProcessError, FileNotFoundError):
return None
cudaver = '9.2' # todo
cudaver = float(output)
match = None
# Return the most suitable CUDA
# version that we have a build for
supported_cudas = [9.2, 10.2, 11.1]
for supported in reversed(supported_cudas):
if cudaver <= supported:
match = supported
break
return match
@staticmethod
def check_need_admin(dirname):
"""Returns True if dirname needs administrator privileges to write to,
False otherwise.
"""
# TODO os.supports_effective_ids added in python 3.3
return not os.access(dirname, os.W_OK | os.X_OK)
@staticmethod
def get_admin_password():
"""Prompt the user for their administrator password."""
def validate_admin_password(password):
printmsg("Checking sudo password", INFO)
cmd = sp.Popen(shlex.split('sudo -S true'),
stdin=PIPE,
stdout=DEVNULL,
stderr=DEVNULL)
cmd.stdin.write(sudo_pwd + '\n')
cmd.stdin.flush()
cmd.communicate()
return cmd.returncode == 0
printmsg('We need your administrator password to install FSL: ',
IMPORTANT, end='', flush=True)
for _ in range(3):
password = getpass.getpass('')
valid = validate_admin_password(password)
if valid: break
else: printmsg("Incorrect password", WARNING)
if not valid:
raise InvalidPassword()
return password
@staticmethod
def download_manifest(url):
"""Downloads the installer manifest file, which contains information
about available FSL vesrions, and the most recent version number of the
installer (this script).
The manifest file is a JSON file with the following structure:
{
"installer" : {
"version" : "1.2.3", # Latest version of installer script
"url" : "http://abc.com" # URL to download installer script
"sha256" : "ab238........." # SHA256 checksum of installer script
}
"versions" : {
# TODO
}
}
"""
log.debug('Downloading FSL installer manifest from %s', url)
with tempdir():
download_file(url, 'manifest.json')
with open('manifest.json') as f:
manifest = f.read()
return json.loads(manifest)
# List of modifiers which can be used to change how
# a message is printed by the printmsg function.
INFO = 1
......@@ -55,20 +259,6 @@ ANSICODES = {
}
__version__ = '0.0.0'
"""Installer version number. This is automatically updated in release versions
whenever a new version is released.
"""
DEFAULT_INSTALLATION_DIRECTORY = '/usr/local/fsl'
"""Default FSL installation directory. """
FSL_DOWNLOAD_URL = 'http://18.133.213.73/installer/'
"""URL to download FSL conda environment files from. """
def printmsg(msg, *msgtypes, **kwargs):
"""Prints msg according to the ANSI codes provided in msgtypes.
All other keyword arguments are passed through to the print function.
......@@ -116,42 +306,81 @@ def memoize(f):
return g
@memoize
def download_installer_manifest():
"""Downloads the installer manifest file, which contains information about
available FSL vesrions, and the most recent version number of the installer
(this script).
class ChecksumError(Exception):
"""Exception raised by the sha256 function if a file checksume does
not match the expected checksum.
"""
The manifest file is a JSON file with the following structure:
{
'fslinstaller' : {
'version' : '1.2.3', # Latest version of installer script
'url' : 'http://abc.com' # URL to download installer script
}
'versions' : {
# TODO
}
}
def sha256(filename, blocksize=1048576, check_against=None):
"""Calculate the SHA256 checksum of the given file. If check_against
is provided, it is compared against the calculated checksum, and an
error is raised if they are not the same.
"""
url = urlparse.urljoin(FSL_DOWNLOAD_URL, 'manifest.json')
hashobj = hashlib.sha256()
log.debug('Downloading FSL installer manifest from %s', url)
with open(filename, 'rb') as f:
while True:
block = f.read(blocksize)
if len(block) == 0:
break
hashobj.update(block)
with tempdir():
download_file(url, 'manifest.json')
checksum = hashobj.hexdigest()
if check_against:
if checksum != check_against:
raise ChecksumError('File {} does not match expected checksum: '
'{}'.format(filename, check_against))
with open('manifest.json') as f:
manifest = f.read()
return json.loads(manifest)
class DownloadFailed(Exception):
"""Exception type raised by the download_file function if a
download fails for some reason.
"""
def self_update():
def download_file(url, destination, blocksize=1048576):
"""Download a file from url, saving it to destination. """
def report_progress(downloaded, total):
downloaded = downloaded / 1048576
total = total / 1048576
if total is not None:
msg = '{:.1f} / {:.1f}MB ...'.format(downloaded, total)
else:
msg = '{:.1f}MB ...'.format(downloaded)
printmsg(msg, end='\r')
printmsg('Downloading {} ...'.format(url))
try:
with urlrequest.urlopen(url) as req, \
open(destination, 'wb') as outf:
try: total = int(req.headers['content-length'])
except KeyError: total = None
downloaded = 0
while True:
block = req.read(blocksize)
if len(block) == 0:
break
downloaded += len(block)
outf.write(block)
report_progress(downloaded, total)
except urllib.error.HTTPError as e:
raise DownloadFailed(f'A network error has occurred while '
f'trying to download {destname}') from e
def self_update(ctx):
"""Checks to see if a newer version of the installer (this script) is
available and if so, downloads it, replaces this script file in-place,
and re-runs the new installer script.
available and if so, downloads it as a temporary file, and runs it in
place of this script.
"""
@ft.total_ordering
......@@ -181,69 +410,27 @@ def self_update():
if p1 > p2: return False
return False
manifest = download_installer_manifest()
thisver = Version(__version__)
latestver = Version(manifest['fslinstaller']['version'])
latestver = Version(ctx.manifest['installer']['version'])
if latestver <= thisver:
log.debug('Installer is up to date (this vesrion: %s, '
'latest version: %s)', thisver, latestver)
return
log.debug('New version of installer is available (%s) - self-updating')
with tempdir():
download_file(manifest['fslinstaller']['url'], 'fslinstaller.py')
# TODO checksum
shutil.copyfile('fslinstaller.py', __file__)
cmd = [sys.executable, __file__] + sys.argv[1:]
log.debug('Running new installer: %s', cmd)
os.execl(*cmd)
def need_admin(dirname):
"""Returns True if dirname needs administrator privileges to write to,
False otherwise.
"""
# TODO os.supports_effective_ids added in python 3.3
return not os.access(dirname, os.W_OK | os.X_OK)
class InvalidPassword(Exception):
"""Exception raised by get_admin_password if the user gives an incorrect
password.
"""
@memoize
def get_admin_password():
"""Prompt the user for their administrator password."""
def validate_admin_password(password):
printmsg("Checking sudo password", INFO)
cmd = sp.Popen(shlex.split('sudo -S true'),
stdin=PIPE,
stdout=DEVNULL,
stderr=DEVNULL)
cmd.stdin.write(sudo_pwd + '\n')
cmd.stdin.flush()
cmd.communicate()
return cmd.returncode == 0
log.debug('New version of installer is available '
'(%s) - self-updating', latestver)
printmsg('We need your administrator password to install FSL: ',
IMPORTANT, end='', flush=True)
tmpf = tempfile.NamedTemporaryFile(delete=False)
tmpf.close()
tmpf = tmpf.name
for _ in range(3):
password = getpass.getpass('')
valid = validate_admin_password(password)
download_file(ctx.manifest['installer']['url'], tmpf)
sha256(tmpf, check_against=ctx.manifest['installer']['sha256'])
if valid: break
else: printmsg("Incorrect password", WARNING)
if not valid:
raise InvalidPassword()
return password
cmd = [sys.executable, tmpf] + sys.argv[1:]
log.debug('Running new installer: %s', cmd)
os.execv(sys.executable, cmd)
def run(cmd, admin=False, display_output=False):
......@@ -270,101 +457,9 @@ class UnsupportedPlatform(Exception):
"""
def identify_platform():
"""Figures out what platform we are running on. Returns a platform
identifier string - one of:
- linux-64 (Linux, x86_64)
- macos-64 (macOS, x86_64)
"""
platforms = {
('linux', 'x86_64') : 'linux-64',
('darwin', 'x86_64') : 'linux-64',
('darwin', 'arm64') : 'macos-64',
}
system = platform.system().lower()
cpu = platform.machine()
key = (system, cpu)
if key not in platforms:
raise UnsupportedPlatform()
return platforms[key]
@memoize
def identify_cuda():
"""Identifies the CUDA version supported on the platform. Returns a
string containing the 'X.Y' CUDA version, or None if CUDA is not supported.
"""
try:
output = sp.check_output('nvidia-smi')
except (sp.CalledProcessError, FileNotFoundError):
return None
cudaver = '9.2' # todo
cudaver = float(output)
match = None
# Return the most suitable CUDA
# version that we have a build for
supported_cudas = [9.2, 10.2, 11.1]
for supported in reversed(supported_cudas):
if cudaver <= supported:
match = supported
break
return match
def list_available_versions():
def list_available_versions(ctx):
"""Lists available FSL versions. """
class DownloadFailed(Exception):
"""Exception type raised by the download_file function if a
download fails for some reason.
"""
def download_file(url, destination, blocksize=1048576):
"""Download a file from url, saving it to destination. """
def report_progress(downloaded, total):
downloaded = downloaded / 1048576
total = total / 1048576
if total is not None:
msg = '{:.1f} / {:.1f}MB ...'.format(downloaded, total)
else:
msg = '{:.1f}MB ...'.format(downloaded)
printmsg(msg, end='\r')
printmsg('Downloading {} ...'.format(url))
try:
with urlrequest.urlopen(url) as req, \
open(destination, 'wb') as outf:
try: total = int(req.headers['content-length'])
except KeyError: total = None
downloaded = 0
while True:
block = req.read(blocksize)
if len(block) == 0:
break
downloaded += len(block)
outf.write(block)
report_progress(downloaded, total)
except urllib.error.HTTPError as e:
raise DownloadFailed(f'A network error has occurred while '
f'trying to download {destname}') from e
manifest = download_installer_manifest()
def install_miniforge(platform, destdir, **kwargs):
......@@ -413,13 +508,18 @@ def parse_args(argv=None):
'dest' : 'Install FSL into this folder (default: '
'{})'.format(DEFAULT_INSTALLATION_DIRECTORY),
'version' : 'Print installer version number and exit',
'debug' : 'Print debugging messages',
'disable_self_update' : 'Do not automaticall update the installer '
'disable_self_update' : 'Do not automatically update the installer '
'script',
'listversions' : 'List available versions of FSL',
'fslversion' : 'Download this specific version of FSL',
'cuda' : 'Install FSL for this CUDA version (default: '
'automatically detected)',
# Path to local installer manifest file
'manifest' : argparse.SUPPRESS,
# Print debugging messages
'debug' : argparse.SUPPRESS,
}
parser = argparse.ArgumentParser()
......@@ -427,6 +527,8 @@ def parse_args(argv=None):
version=__version__, help=helps['version'])
parser.add_argument('-D', '--debug', action='store_true',
help=helps['debug'])
parser.add_argument('-m', '--manifest', default=FSL_INSTALLER_MANIFEST,
help=helps['manifest'])
parser.add_argument('-u', '--disable_self_update', action='store_true',
help=helps['disable_self_update'])
parser.add_argument('-d', '--dest', metavar='DESTDIR',
......@@ -442,9 +544,9 @@ def parse_args(argv=None):
logging.basicConfig()
if args.debug:
logging.getLogger('fslinstaller').setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger('fslinstaller').setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
return args
......@@ -452,41 +554,24 @@ def parse_args(argv=None):
def main(argv=None):
args = parse_args(argv)
ctx = Context(args)
if not args.disable_self_update:
self_update(ctx)
printmsg('FSL installer version: ', EMPHASIS, end='')
printmsg(__version__ + '\n')
if not args.disable_self_update:
self_update()
if args.listversions:
list_available_versions()
list_available_versions(ctx)
sys.exit(0)
if args.dest is None:
printmsg('Where do you want to install FSL?', IMPORTANT, EMPHASIS)
printmsg('Press enter to install to the default '
'location [{}]'.format(DEFAULT_INSTALLATION_DIRECTORY), INFO)
args.dest = prompt('FSL installation directory:', QUESTION)
platform = identify_platform()
cudaver = identify_cuda()
admin = need_admin(args.dest)
printmsg('FSL installation directory: ', EMPHASIS, end='')
printmsg(args.dest)
printmsg('Platform: ', EMPHASIS, end='')
printmsg(platform)
printmsg('CUDA: ', EMPHASIS, end='')
printmsg(cudaver or 'not detected')
printmsg('Admin password required: ', EMPHASIS, end='')
printmsg(admin)
get_admin_password()
ctx.need_admin = need_admin(args.dest)
ctx.admin_password = get_admin_password()
with tempdir():
install_miniforge(platform, args.dest, admin)
install_fsl(args.dest, args.version, platform, cudaver, admin)
install_miniforge(ctx)
install_fsl(ctx)
if __name__ == '__main__':
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment