Skip to content
Snippets Groups Projects
Commit fc41f3c8 authored by Fidel Alfaro Almagro's avatar Fidel Alfaro Almagro :speech_balloon:
Browse files

Merge branch 'enh/config' into 'master'

BIP configuration system

See merge request falmagro/bip!3
parents d3c89564 678ba29b
No related branches found
No related tags found
1 merge request!3BIP configuration system
Pipeline #17580 failed
#!/usr/bin/env bash
conda create \
-c https://fsl.fmrib.ox.ac.uk/fsldownloads/fslconda/public/ \
-c conda-forge \
-p ./test.env \
"python=${PYTHON_VERSION}" \
pytest \
coverage \
pytest-cov
source activate ./test.env
pip install --no-deps -e .
pytest
stages:
- test
.test: &test_template
stage: test
image: continuumio/miniconda3
tags:
- docker
except:
- tags
- merge_requests
script:
- bash ./.ci/unit_tests.sh
test:3.11:
variables:
PYTHON_VERSION : "3.11"
<<: *test_template
#!/usr/bin/env python
#
# Utility functions available for use by tests.
#
import contextlib
import os
import os.path as op
import tempfile
def touch(fpath):
"""Create a dummy file at fpath."""
with open(fpath, 'wt') as f:
f.write(fpath)
@contextlib.contextmanager
def tempdir():
"""Create and change into a temp directory, deleting it afterwards."""
prevdir = os.getcwd()
with tempfile.TemporaryDirectory() as td:
os.chdir(td)
try:
yield td
finally:
os.chdir(prevdir)
@contextlib.contextmanager
def mock_directory(contents):
"""Create a temp directory with dummy contents."""
with tempdir() as td:
for c in contents:
touch(c)
yield td
def dicts_equal(da, db):
"""Compare two dicts, ignoring order."""
da = {k : da[k] for k in sorted(da.keys())}
db = {k : db[k] for k in sorted(db.keys())}
return da == db
#!/usr/bin/env python
import os.path as op
import textwrap as tw
import unittest
import pytest
import bip.utils.config as config
from bip.tests import touch, tempdir, mock_directory, dicts_equal
def test_nested_lookup():
# (dictionary, key, expected_value)
tests = [
({'a' : {'b' : {'c' : {'d' : 'e'}}}}, ['a'],
{'b' : {'c' : {'d' : 'e'}}}),
({'a' : {'b' : {'c' : {'d' : 'e'}}}}, ['a', 'b'],
{'c' : {'d' : 'e'}}),
({'a' : {'b' : {'c' : {'d' : 'e'}}}}, ['a', 'b', 'c'],
{'d' : 'e'}),
({'a' : {'b' : {'c' : {'d' : 'e'}}}}, ['a', 'b', 'c', 'd'],
'e'),
]
for d, k, exp in tests:
assert config.nested_lookup(d, k) == exp
def test_flatten_dictionary():
# (indict, nlevels, expdict)
tests = [
({'a' : {'b' : 'c', 'd' : 'e'}}, 1,
{'a_b' : 'c', 'a_d' : 'e'}),
({'a' : 'b', 'c' : {'d' : 'e', 'f' : 'g'}}, 1,
{'a' : 'b', 'c_d' : 'e', 'c_f' : 'g'}),
({'a' : 'b', 'c' : {'d' : 'e', 'f' : {'g' : 'h', 'i' : 'j'}}}, 1,
{'a' : 'b', 'c_d' : 'e', 'c_f' : {'g' : 'h', 'i' : 'j'}}),
({'a' : 'b', 'c' : {'d' : 'e', 'f' : {'g' : 'h', 'i' : 'j'}}}, 2,
{'a' : 'b', 'c_d' : 'e', 'c_f_g' : 'h', 'c_f_i' : 'j'}),
]
for indict, nlevels, expdict in tests:
assert config.flatten_dictionary(indict, nlevels) == expdict
def test_parse_override_value():
# (template value, input value, expected)
tests = [
('a', 'b', 'b'),
(None, 'b', 'b'),
(1, '1', 1),
(1.5, '1', 1),
(1, '1.5', 1.5),
(1.5, '1.5', 1.5),
(True, 'false', False),
(True, 'true', True),
([], '[1,2,3,4]', [1, 2, 3, 4]),
({}, '{"a" = 1, "b" = 2}', {'a' : 1, 'b' : 2})
]
for origval, val, expect in tests:
assert config.parse_override_value('param', origval, val) == expect
def test_Config_config_file_identifier():
# (filename, expected)
tests = [
('config.toml', 'config'),
('T1_struct.toml', 'T1_struct'),
('01.T1_struct.toml', 'T1_struct'),
('01.02.T1_struct.toml', 'T1_struct'),
]
for filename, expected in tests:
assert config.Config.config_file_identifier(filename) == expected
def test_Config_list_config_files():
contents = ['config.toml', 'config.json', 'config.cfg',
'abcde.toml', 'bcdef.toml', 'cdefg.toml',
'02.defgh.toml', 'some_file.json']
# files should be in alphabetical
# order, with config.toml last
expect = ['02.defgh.toml', 'abcde.toml', 'bcdef.toml',
'cdefg.toml', 'config.toml']
with mock_directory(contents) as mdir:
expect = [op.join(e) for e in expect]
assert config.Config.list_config_files(mdir)
def test_Config_resolve_selectors():
indict1 = {'param1' : '1',
'param2' : '2',
'subject' : {'12345' : {'param1' : '3', 'param2' : '4'},
'56789' : {'param1' : '5', 'param2' : '6'}}}
indict2 = {'param1' : '1',
'param2' : '2',
'subject' : {'12345' : {'param1' : '3',
'visit' : {'2' : {'param1' : '3.2',
'param2' : '4.2'}}},
'56789' : {'param1' : '5',
'visit' : {'2' : {'param1' : '5.2',
'param2' : '6.2'}}}}}
# (input dict, selectors, expected output)
tests = [
(indict1, {}, {'param1' : '1', 'param2' : '2'}),
(indict1, {'subject' : '12345'}, {'param1' : '3', 'param2' : '4'}),
(indict1, {'subject' : '56789'}, {'param1' : '5', 'param2' : '6'}),
(indict2, {}, {'param1' : '1', 'param2' : '2'}),
(indict2, {'subject' : '12345'}, {'param1' : '3', 'param2' : '2'}),
(indict2, {'subject' : '56789'}, {'param1' : '5', 'param2' : '2'}),
(indict2, {'visit' : '2'}, {'param1' : '1', 'param2' : '2'}),
(indict2, {'subject' : '12345',
'visit' : '1'}, {'param1' : '3', 'param2' : '2'}),
(indict2, {'subject' : '56789',
'visit' : '1'}, {'param1' : '5', 'param2' : '2'}),
(indict2, {'subject' : '12345',
'visit' : '2'}, {'param1' : '3.2', 'param2' : '4.2'}),
(indict2, {'subject' : '56789',
'visit' : '2'}, {'param1' : '5.2', 'param2' : '6.2'}),
]
for indict, selectors, expdict in tests:
result = config.Config.resolve_selectors(indict, selectors)
for k, v in expdict.items():
assert result[k] == v
# selector/setting conflict - probably
# invalid, but function shouold not crash
indict = {'subject' : '12345', 'param1' : '1'}
selectors = {'subject' : '12345'}
result = config.Config.resolve_selectors(indict, selectors)
assert dicts_equal(indict, result)
def test_Config_load_config_file():
configtoml = tw.dedent("""
param1 = 1
param2 = 2
subject.12345.param1 = 3
subject.56789.param1 = 5
subject.12345.visit.2.param1 = 3.2
subject.12345.visit.2.param2 = 4.2
subject.56789.visit.2.param1 = 5.2
subject.56789.visit.2.param2 = 6.2
""").strip()
# (selectors, expected output)
tests = [
({}, {'param1' : 1, 'param2' : 2}),
({'subject' : '12345'}, {'param1' : 3, 'param2' : 2}),
({'subject' : '56789'}, {'param1' : 5, 'param2' : 2}),
({'visit' : '2'}, {'param1' : 1, 'param2' : 2}),
({'subject' : '12345',
'visit' : '1'}, {'param1' : 3, 'param2' : 2}),
({'subject' : '56789',
'visit' : '1'}, {'param1' : 5, 'param2' : 2}),
({'subject' : '12345',
'visit' : '2'}, {'param1' : 3.2, 'param2' : 4.2}),
({'subject' : '56789',
'visit' : '2'}, {'param1' : 5.2, 'param2' : 6.2}),
]
with tempdir():
fnames = ['config.toml', 'abc.toml']
for fname in fnames:
open(fname, 'wt').write(configtoml)
for selectors, expect in tests:
result = config.Config.load_config_file(fname, selectors)
for k, v in expect.items():
if fname != 'config.toml':
ident = config.Config.config_file_identifier(fname)
k = f'{ident}_{k}'
assert result[k] == v
def test_Config_load_config_file_main_relabelling():
configtoml = tw.dedent("""
param1 = 1
param2 = 2
[abc]
param3 = 3
param4 = 4
[def]
param5 = 5
param6 = 6
""").strip()
expect = {
'param1' : 1,
'param2' : 2,
'abc_param3' : 3,
'abc_param4' : 4,
'def_param5' : 5,
'def_param6' : 6
}
with tempdir():
open('config.toml', 'wt').write(configtoml)
result = config.Config.load_config_file('config.toml')
assert result == expect
def test_Config_create():
configtoml = tw.dedent("""
param1 = 0.5
abc_param1 = 0.4
def_param1 = 0.3
""").strip()
abctoml = tw.dedent("""
param1 = 0.6
param2 = 0.7
""").strip()
def01toml = tw.dedent("""
param1 = 0.8
param2 = 0.9
""").strip()
def02toml = tw.dedent("""
param1 = 1.0
param2 = 1.1
""").strip()
exp = {
'param1' : 0.5,
'abc_param1' : 0.4,
'def_param1' : 0.3,
'abc_param2' : 0.7,
'def_param2' : 1.1,
}
with tempdir():
open('config.toml', 'wt').write(configtoml)
open('abc.toml', 'wt').write(abctoml)
open('01.def.toml', 'wt').write(def01toml)
open('02.def.toml', 'wt').write(def02toml)
cfg = config.Config('.')
for k, v in exp.items():
assert cfg[k] == v
with tempdir():
cfg = config.Config('.')
assert len(cfg) == 0
def test_Config_overrides():
configtoml = tw.dedent("""
param1 = 1
param2 = 'abc'
param3 = [1, 2, 3]
[abc]
param1 = true
param2 = 0.4
""").strip()
# (overrides, expected)
pass_tests = [
({},
{'param1' : 1,
'param2' : 'abc',
'param3' : [1, 2, 3],
'abc_param1' : True,
'abc_param2' : 0.4}),
({'param2' : 'def',
'abc_param2' : '0.7'},
{'param1' : 1,
'param2' : 'def',
'param3' : [1, 2, 3],
'abc_param1' : True,
'abc_param2' : 0.7}),
({'param1' : '8',
'abc_param1' : 'false'},
{'param1' : 8,
'param2' : 'abc',
'param3' : [1, 2, 3],
'abc_param1' : False,
'abc_param2' : 0.4}),
({'param4' : 'ghi',
'param5' : '[1,2,3,4]',
'abc_param3' : '7.5'},
{'param1' : 1,
'param2' : 'abc',
'param3' : [1, 2, 3],
'param4' : 'ghi',
'param5' : [1, 2, 3, 4],
'abc_param1' : True,
'abc_param2' : 0.4,
'abc_param3' : 7.5}),
]
# Overrides should be rejected when
# their type (as inferred by tomllib
# parsing) does not match the type of
# the stored value.
fail_tests = [
{'param1' : 'abc'},
{'param3' : '1.5'},
{'abc_param1' : 'abc'},
{'abc_param2' : 'abc'},
]
with tempdir():
open('config.toml', 'wt').write(configtoml)
for overrides, expected in pass_tests:
cfg = config.Config('.', overrides=overrides)
assert dicts_equal(cfg, expected)
for overrides in fail_tests:
with pytest.raises(ValueError):
config.Config('.', overrides=overrides)
def test_Config_access():
configtoml = tw.dedent("""
param1 = 1
param2 = 'abc'
param3 = [1, 2, 3]
[abc]
param1 = true
param2 = 0.4
""").strip()
exp = {
'param1' : 1,
'param2' : 'abc',
'param3' : [1, 2, 3],
'abc_param1' : True,
'abc_param2' : 0.4,
}
with tempdir() as td:
open('config.toml', 'wt').write(configtoml)
cfg = config.Config('.')
assert cfg.cfgdir == td
assert len(cfg) == len(exp)
assert sorted(cfg.keys()) == sorted(exp.keys())
assert sorted(cfg.items()) == sorted(exp.items())
# Order of exp is intentional, to
# match config construction order.
assert [v1 == v2 for v1, v2 in zip(cfg.values(), exp.values())]
for k, v in exp.items():
assert k in cfg
assert cfg[k] == v
assert getattr(cfg, k) == v
assert cfg.get(k) == v
assert cfg.getall(param1=0, param2=1, param4=3) == \
{'param1' : 1, 'param2' : 'abc', 'param4' : 3}
assert cfg.gettuple(param1=0, param2=1, param4=3) == (1, 'abc', 3)
assert cfg.getall('abc', param1=0, param2=1, param3=3) == \
{'param1' : True, 'param2' : 0.4, 'param3' : 3}
assert cfg.gettuple('abc', param1=0, param2=1, param4=3) == \
(True, 0.4, 3)
#!/usr/bin/env python
import pkgutil
import importlib
import bip
def test_importall():
def recurse(module):
path = module.__path__
name = module.__name__
submods = list(pkgutil.iter_modules(path, f'{name}.'))
for i, (spath, smodname, ispkg) in enumerate(submods):
submod = importlib.import_module(smodname)
if ispkg:
recurse(submod)
recurse(bip)
import shutil #!/usr/bin/env python
from fsl.wrappers.wrapperutils import cmdwrapper """The bip.utils package contains a range of miscellaneous utilities. """
installed_sienax = shutil.which('bb_sienax')
import contextlib
if installed_sienax: import logging
@cmdwrapper import os.path as op
def bb_sienax(input, output=None, **kwargs): import os
return ['bb_sienax', input, output] import time
else:
def bb_sienax(input, output=None, **kwargs):
from . import sienax log = logging.getLogger(__name__)
sienax.bb_sienax(input, output)
@contextlib.contextmanager
def lockdir(dirname, delay=5):
"""Lock a directory for exclusive access.
Primitive mechanism by which concurrent access to a directory can be
prevented. Attempts to create a semaphore file in the directory, but waits
if that file already exists. Removes the file when finished.
"""
lockfile = op.join(dirname, '.fsl_ci.lockdir')
while True:
try:
log.debug(f'Attempting to lock %s for exclusive access.', dirname)
fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
break
except OSError as e:
if e.errno != errno.EEXIST:
raise e
log.debug('%s is already locked - trying again '
'in %s seconds ...', dirname, delay)
time.sleep(10)
log.debug('Exclusive access acquired for %s ...', dirname)
try:
yield
finally:
log.debug('Relinquishing lock on %s', dirname)
os.close( fd)
os.unlink(lockfile)
#!/usr/bin/env python
"""This module provides the BIP Config class. The Config class is used
throughout BIP for accessing settings stored in BIP configuration files.
# Configuration file format
TODO
# Selectors - alternate setting values for specific scenarios
TODO
# Type validation
TODO
"""
import copy
import glob
import functools as ft
import itertools as it
import logging
import os
import os.path as op
import operator
# python >= 3.11
try:
import tomllib
# python < 3.11
except ImportError:
import tomli as tomllib
import bip
log = logging.getLogger(__name__)
MAIN_CONFIG_FILE_NAME = 'config.toml'
"""Name of the primary BIP configuration file. This file contains global
settings, and may contain pipeline-specific settings which take precedence
over equivalent settings in secondary configuration files.
"""
DEFAULT_CONFIG_DIRECTORY = op.join(op.dirname(bip.__file__), 'data', 'config')
"""Default configuration directory. Used when a Config object is created with
specifying a directory.
"""
def nested_lookup(d, key):
"""Look up a value in a nested dictionary based on the given key.
For example, imagine we have a dictionary d:
{'a' : {'b' : {'c' : {'d' : 'e'}}}}
We can retrieve 'e' like so:
nested_lookup(d, ['a', 'b', 'c', 'd'])
"""
if key[0] not in d:
raise KeyError()
if not isinstance(d, dict):
return d
d = d[key[0]]
if len(key) == 1:
return d
return nested_lookup(d, key[1:])
def flatten_dictionary(d, nlevels=1):
"""Flattens a nested dictionary.
Given a dictionary such as:
{
'k1' : 'v1',
'k2' : {
'sk1' : 'sv1',
'sk2' : 'sv2'
}
}
this function will adjust the dictionary to have structure:
{
'k1' : 'v1',
'k2_sk1' : 'sv1'
'k2_sk2' : 'sv2'
}
If nlevels is 1 (the default), the function will only flatten
sub-dictionaries that are one-level deep.
"""
d = copy.deepcopy(d)
for key, val in list(d.items()):
if not isinstance(val, dict):
continue
if nlevels > 1:
val = flatten_dictionary(val, nlevels - 1)
for subkey, subval in val.items():
subkey = f'{key}_{subkey}'
d[subkey] = subval
d.pop(key)
return d
def parse_override_value(name, origval, val):
"""Use tomllib to coerce a string to a suitable type."""
# If the configuration file value is a string,
# return the override value unchanged
if isinstance(origval, str):
return val
# Otherwise use tomllib to convert the value.
# Leave as a string on failures, but emit
# a warning.
try:
return tomllib.loads(f'val = {val}')['val']
except Exception:
log.warning('Cannot parse override value for %s (%s) - '
'leaving value as a string.', name, val)
return val
class Config:
"""The Config class is a dictionary containing BIP settings. A Config
can be created by specifying the directory which contains all BIP
configuration files, e.g.:
cfg = Config('path/to/config/dir/')
"""
@staticmethod
def config_file_identifier(fname):
"""Return an identifier for the given BIP configuration file.
BIP configuration files are named like so:
[<prefix>.]<identifier>.toml
For example:
config.toml # identifier: "config"
T1_struct.toml # identifier: "T1_struct"
01.fMRI_task.toml # identifier: "fMRI_task", prefix: "01"
02.fMRI_task.toml # identifier: "fMRI_task", prefix: "02"
03.fMRI_rest.toml # identifier: "fMRI_rest", prefix: "03"
BIP settings may be grouped according to the file identifier - the file
prefix is used solely for enforcing a particular ordering.
"""
base = op.basename(fname)
if base == MAIN_CONFIG_FILE_NAME:
return 'config'
base = base.removesuffix('.toml')
# Drop file prefix if present
return base.split('.')[-1]
@staticmethod
def list_config_files(cfgdir):
"""Return a list of all BIP configuration files contained in cfgdir.
The files are sorted according to the order in which they should be
loaded.
A BIP configuration is a directory containing one or more .toml files.
BIP configuration files are intended to be loaded in alphabetical
order, with settings in later files overwriting settings in earlier
files.
The "config.toml" file must be loaded last, so that settings
contained within it take precedence over all other files.
"""
cfgfiles = glob.glob(op.join(cfgdir, '*.toml'))
cfgfiles = sorted(cfgfiles, key=str.lower)
maincfg = op.join(cfgdir, MAIN_CONFIG_FILE_NAME)
# make sure main config is loaded last
if maincfg in cfgfiles:
cfgfiles.remove(maincfg)
cfgfiles.append(maincfg)
return cfgfiles
@staticmethod
def resolve_selectors(settings, selectors):
"""Resolves and applies any "selector" configuration settings.
A BIP configuration file may contain one default value for a
setting, but may also contain alternate values for that setting
which should be used in certain scenarios. Each alternate value
is associated with a set of "selector" parameters, which are
just key-value pairs.
For example, a file may contain a default value for a setting called
"param1", and an alternate value for param1 to be used when the
"subject" selector parameter is set to "12345":
param1 = 0.5
subject.12345.param1 = 0.4
If resolve_selectors is given these settings along with subject=12345,
the default value for param1 will be replaced with the selector value.
Multiple selector parameters may be specified - in the configuration
file, the selector parameters must be ordered alphabetically. For
example, if we have selector parameters "subject" and "visit", the
alternate values for subject=123 and visit=2 must be specified as:
param1 = 0.5
subject.123.visit.2.param1 = 0.3
But cannot be specified as:
param1 = 0.5
visit.2.subject.123.param1 = 0.3
"""
settings = copy.deepcopy(settings)
# Take the selector parameters, and generate all possible
# candidate keys - e.g. {'subject' : '123', 'visit' : '1'}
# will result in:
#
# ['subject', '123']
# ['visit', '1']
# ['subject', '123', 'visit', '1']
kvps = [[str(k), str(v)] for k, v in selectors.items()]
patterns = [it.combinations(kvps, i) for i in range(1, len(kvps) + 1)]
patterns = it.chain(*patterns)
patterns = [ft.reduce(operator.add, p) for p in patterns]
for pat in patterns:
try:
val = nested_lookup(settings, pat)
except KeyError:
continue
# If an entry with a selector name as key is
# present, and is not a dictionary (e.g.
# "subject = 1"), this is probably an error
# in the config file, or the selectors the
# user has provided. Warn and carry on
if not isinstance(val, dict):
log.warning('Ignoring primitive selector value (%s = %s)',
pat, val)
continue
log.debug('Updating settings from selector: %s', pat)
for k, v in val.items():
# If we have a configuration like:
#
# subject.123.visit.1.param = 0.5
# visit.2.param = 0.5
#
# and we are processing ['subject', '123'], we
# do not want to clobber the original 'visit'.
if k not in selectors:
settings[k] = v
return settings
@staticmethod
def apply_overrides(settings, overrides=None):
"""Override some settings with override values.
Any value read from the configuration files can be overridden.
If an override value has an incompatible type with the value from the
configuration file, a ValueError is raised.
"""
def types_match(old, new):
"""Return True if old and new are of compatible types."""
if isinstance(old, (float, int)):
return isinstance(new, (float, int))
else:
return isinstance(new, type(old))
settings = copy.deepcopy(settings)
for key, val in overrides.items():
origval = settings.get(key, None)
# Coerce strings to a toml type
if isinstance(val, str):
val = parse_override_value(key, origval, val)
# Reject the new value if it has a
# different type to the original value
if origval is not None and not types_match(origval, val):
raise ValueError(f'{key}: override value has wrong type (got '
f'{type(val)}, expected {type(origval)}')
log.debug('Overriding %s (%s -> %s)', key, origval, val)
settings[key] = val
return settings
@staticmethod
def load_config_file(cfgfile, selectors=None):
"""Load a BIP configuration file. The file is assumed to be a TOML
file, named as described in the config_file_identifier documentation.
All settings contained in the file are re-labelled with the file
identifier. For example, if the file is called "T1_struct.toml", and
contains:
bet_f = 0.5
fast_classes = 3
the loaded dictionary will contain:
'T1_struct_bet_f' : 0.5
'T1_struct_fast_classes' : 3
Re-labelling is not applied to the main "config.toml" configuration
file.
"""
if selectors is None:
selectors = {}
with open(cfgfile, 'rb') as f:
settings = tomllib.load(f)
ident = Config.config_file_identifier(cfgfile)
selectors = {k : selectors[k] for k in sorted(selectors.keys())}
settings = Config.resolve_selectors(settings, selectors)
# Any tables in the main config file are relabelled
# to "<tablename>_<setting>. For example, if the
# main config.toml contains:
#
# some_global_param = 75
#
# [T1_struct]
# bet_f = 0.2
#
# bet_f is relabelled to T1_struct_bet_f
if ident == 'config':
settings = flatten_dictionary(settings, 1)
# All settings in secondary configuration files
# are relabelled to "<ident>_<setting>". For example,
# if T1_struct.toml contains:
#
# bet_f = 0.5
#
# bet_f is relabelled to T1_struct_bet_f
else:
ident = f'{ident}_'
settings = {f'{ident}{k}' : v for k, v in settings.items()}
return settings
def __init__(self, cfgdir=None, selectors=None, overrides=None):
"""Create a Config object. Read configuration files from cfgdir.
Selectors are applied using Config.resolve_selectors.
Override values are applied using Config.apply_overrides.
"""
if selectors is None: selectors = {}
if overrides is None: overrides = {}
if cfgdir is None: cfgdir = DEFAULT_CONFIG_DIRECTORY
cfgfiles = Config.list_config_files(cfgdir)
if len(cfgfiles) == 0:
log.warning('No BIP configuration files found in %s', cfgdir)
settings = {}
for fname in cfgfiles:
log.debug('Loading settings from %s', fname)
settings.update(Config.load_config_file(fname, selectors))
settings = Config.apply_overrides(settings, overrides)
self.__settings = settings
self.__cfgdir = op.abspath(cfgdir)
@property
def cfgdir(self):
"""Return the directory that the config files were loaded from. """
return self.__cfgdir
def __len__(self):
"""Return the number of entries in this Config. """
return len(self.__settings)
def __contains__(self, key):
"""Return True if a configuration item with key exists. """
return key in self.__settings
def __getitem__(self, key):
"""Return the configuration item with the specified key. """
return self.__settings[key]
def __getattr__(self, key):
"""Return the configuration item with the specified key. """
return self[key]
def get(self, key, default=None):
"""Return value associated with key.
Return default if there is no value associated with key.
"""
if key not in self:
return default
return self[key]
def getall(self, prefix=None, **kwargs):
"""Return all requested values with given key prefix.
The specified default value is used for any keys which are not present.
The values are returned as a dictionary, with the keys un-prefixed.
"""
if prefix is None: prefix = ''
else: prefix = f'{prefix}_'
values = {}
for k, v in kwargs.items():
values[k] = self.get(f'{prefix}{k}', v)
return values
def gettuple(self, prefix=None, **kwargs):
"""Same as getall, but values are returned as a tuple. """
return tuple(self.getall(prefix, **kwargs).values())
def keys(self):
"""Return all keys present in the config. """
return self.__settings.keys()
def values(self):
"""Return all values present in the config. """
return self.__settings.values()
def items(self):
"""Return all key-value pairs present in the config. """
return self.__settings.items()
[tool:pytest]
testpaths = bip/tests
addopts = -v --cov=bip
[coverage:run]
source = bip
omit = bip/tests/*
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment