Commit 3ea64222 authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

MNT: New clear_channel script for poking at the channel dirs - needed because

we don't have direct access to them.
parent a685dab1
#!/usr/bin/env python
# Remove packages from a conda channel. See the
# rules/fsl-ci-management-rules.yml:clear-public-channel and
# clear-private-channel jobs.
# Exactly one of the CLEAR_ALL, CLEAR_OLD, or PATTERN variables must be
# set - if not, or if more than one of these variables are set, the job
# exits without doing anything.
# If the CLEAR_ALL variable is set, *all* packages are removed.
# If the CLEAR_OLD variable is set, all except the most recent version
# of each package is removed.
# The PATTERN variable may be a semi-colon- separated list of glob-style
# patterns which indicate the packages to be deleted. For example, to
# remove all packages starting with "abc-", and all packages with
# "-misc" in their name:
# PATTERN="abc-*;*-misc*"
# Each pattern is matched against the conda package file names in each
# of the channel platform directories (i.e. noarch, linux-64, osx-64,
# etc), and all matching package files are removed.
# The DRY_RUN variable has a default value of "true". It must be
# manually set to "false" in order for packages to be removed. When
# DRY_RUN is "true", no package files are removed - instead, the package
# files that would be removed are printed to standard output.
# Author: Paul McCarthy <>
import os
import os.path as op
import itertools as it
import sys
import glob
from packaging.version import parse
from fsl_ci import sprun, lockdir
from fsl_ci.conda import (read_channel_repodata,
PLATFORMS = ('noarch', 'linux-64', 'osx-64', 'win-32', 'win64')
def remove(path):
dry_run = os.environ.get('DRY_RUN', 'true') == 'true'
if dry_run: print(path)
else: os.remove(path)
def clear_all_packages(channeldir):
for plat in PLATFORMS:
packages = glob.glob(op.join(channeldir, plat, '*.tar.bz2'))
for package in packages:
def clear_old_packages(channeldir):
def sortkey(pkg):
return (parse(pkg[1]['version']), int(pkg[1]['build']))
chandata, platdata = read_channel_repodata(channeldir)
packages = load_packages(chandata, platdata)
for pkg in packages.values():
for plat in pkg.platforms:
# get file name + package entry
pkgfiles = [(k, v) for k, v in platdata[plat]['packages'].items()
if v['name'] ==]
# sort so oldest is first
pkgfiles = sorted(pkgfiles, sortkey)
for filename, _ in pkgfiles[:-1]:
def clear_packages_matching_pattern(channeldir, patterns):
patterns = patterns.split(';')
for plat, pattern in it.product(PLATFORMS, patterns):
pattern = op.join(channeldir, plat, pattern)
packages = glob.glob(pattern)
for package in packages:
def main():
pattern = os.environ.get('PATTERN', None)
clear_all = os.environ.get('CLEAR_ALL', None)
clear_old = os.environ.get('CLEAR_OLD', None)
jobname = os.environ['CI_JOB_NAME']
if jobname == 'clear-public-channel':
channeldir = os.environ['FSLCONDA_PUBLIC_CHANNEL_DIRECTORY']
nset = sum((pattern is not None,
clear_all is not None,
clear_old is not None))
if nset != 1:
print('Exactly one of PATTERN, CLEAR_OLD, or CLEAR_ALL must be set')
return 1
with lockdir(channeldir):
if clear_all:
if clear_old:
if pattern:
clear_packages_matching_pattern(channeldir, pattern)
sprun(f'conda index {channeldir}')
return 0
if __name__ == '__main__':
......@@ -36,6 +36,7 @@ setup(
entry_points={'console_scripts' : [
'build_conda_package = fsl_ci.scripts.build_conda_package:main',
'clear_channel = fsl_ci.scripts.clear_channel:main',
'configure_repositories = fsl_ci.utils.configure_repsitories:main',
'create_conda_recipe = fsl_ci.utils.create_conda_recipe:main',
'deploy_conda_package = fsl_ci.scripts.deploy_conda_package:main',
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment