diff --git a/fsl/utils/filetree/filetree.py b/fsl/utils/filetree/filetree.py index bbc41105654f6bb2f1cc3372551ed1eef0fb31a2..0b077d6c300611c783aee1cbbe7dbc0788342a6f 100644 --- a/fsl/utils/filetree/filetree.py +++ b/fsl/utils/filetree/filetree.py @@ -181,10 +181,10 @@ class FileTree(object): :param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk. Any defined variables in `glob_vars` will be ignored. If glob_vars is set to 'all', all undefined variables will be used to look up matches. - :return: sorted sequence of paths + :return: sequence of paths """ - text, variables = self.get_template(short_name) - return tuple(str(Path(fn)) for fn in utils.get_all(text, variables, glob_vars=glob_vars)) + return tuple([self.update(**vars).get(short_name) + for vars in self.get_all_vars(short_name, glob_vars=glob_vars)]) def get_all_vars(self, short_name: str, glob_vars=()) -> Tuple[Dict[str, str]]: """ @@ -196,7 +196,8 @@ class FileTree(object): If glob_vars is set to 'all', all undefined variables will be used to look up matches. :return: sequence of dictionaries with the variables settings used to generate each filename """ - return tuple(self.extract_variables(short_name, fn) for fn in self.get_all(short_name, glob_vars=glob_vars)) + text, variables = self.get_template(short_name) + return utils.get_all(text, variables, glob_vars=glob_vars) def get_all_trees(self, short_name: str, glob_vars=(), set_parent=True) -> Tuple["FileTree"]: """ diff --git a/fsl/utils/filetree/query.py b/fsl/utils/filetree/query.py index 6407b14e3a39a2c59bb12502f21f6dc0512f0eed..56ba6cc4991b30bb47aad00ed513cc3868465951 100644 --- a/fsl/utils/filetree/query.py +++ b/fsl/utils/filetree/query.py @@ -369,12 +369,12 @@ def scan(tree : FileTree) -> List[Match]: matches = [] for template in tree.templates: - for filename in tree.get_all(template, glob_vars='all'): + for variables in tree.get_all_vars(template, glob_vars='all'): - if not op.isfile(filename): - continue + filename = tree.update(**variables).get(template) - variables = dict(tree.extract_variables(template, filename)) + if not op.isfile(tree.update(**variables).get(template)): + continue matches.append(Match(filename, template, tree, variables)) diff --git a/fsl/utils/filetree/utils.py b/fsl/utils/filetree/utils.py index 11464e93570660dd27e1fa9103a3b4ff3e731f03..621d97eeaa9b4e2c40a5e6879a47fa82b09a136c 100644 --- a/fsl/utils/filetree/utils.py +++ b/fsl/utils/filetree/utils.py @@ -1,105 +1,373 @@ import re import itertools import glob -from . import filetree +from typing import List, Sequence, Set, Tuple, Dict, Iterator -def resolve(template, variables): +class Part: """ - Resolves the template given a set of variables + Individual part of a template - :param template: template - :param variables: mapping of variable names to values - :return: cleaned string + 3 subclasses are defined: + + - :class:`Literal`: piece of text + - :class:`Required`: required variable to fill in (between curly brackets) + - :class:`Optional`: part of text containing optional variables (between square brackets) """ - filled = fill_known(template, variables) - filename = resolve_optionals(filled) - remaining = find_variables(filename) - if len(remaining) > 0: - raise filetree.MissingVariable('Variables %s not defined' % set(remaining)) - return filename + def fill_known(self, variables) -> Sequence["Part"]: + """ + Fills in the given variables + """ + return [self] + def optional_variables(self, ) -> Set["Part"]: + """ + Returns all variables in optional parts + """ + return set() -def get_all(template, variables, glob_vars=()): - """ - Gets all variables matching the templates given the variables + def required_variables(self, ) -> Set["Part"]: + """ + Returns all required variables + """ + return set() - :param template: template - :param variables: (incomplete) mapping of variable names to values - :param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk - If `glob_vars` contains any defined variables, it will be ignored. - :return: sequence of filenames + def contains_optionals(self, variables: Set["Part"]=None): + """ + Returns True if this part contains the optional variables + """ + return False + + def append_variables(self, variables: List[str]): + """ + Appends the variables in this part to the provided list in order + """ + pass + + +class Literal(Part): + def __init__(self, text: str): + """ + Literal part is defined purely by the text it contains + + :param text: part of the template + """ + self.text = text + + def __str__(self): + """ + Returns this part of the template as a string + """ + return self.text + + +class Required(Part): + def __init__(self, var_name, var_formatting=None): + """ + Required part of template (between curly brackets) + + Required variable part of template is defined by variable name and its format + + :param var_name: name of variable + :param var_formatting: how to format the variable + """ + self.var_name = var_name + self.var_formatting = var_formatting + + def __str__(self): + """ + Returns this part of the template as a string + """ + if self.var_formatting is None: + return '{' + self.var_name + '}' + else: + return '{' + self.var_name + ':' + self.var_formatting + '}' + + def fill_known(self, variables): + if self.var_name in variables: + return Template.parse(str(self).format(**variables)).parts + return [self] + + def required_variables(self, ): + return {self.var_name} + + def append_variables(self, variables): + variables.append(self.var_name) + + +class Optional(Part): + def __init__(self, sub_template: "Template"): + """ + Optional part of template (between square brackets) + + Optional part can contain literal and required parts + + :param sub_template: part of the template within square brackets + """ + self.sub_template = sub_template + + def __str__(self): + return '[' + str(self.sub_template) + ']' + + def fill_known(self, variables): + new_opt = self.sub_template.fill_known(variables) + if len(new_opt.required_variables()) == 0: + return Template.parse(str(new_opt)).parts + return [Optional(new_opt)] + + def optional_variables(self, ): + return self.sub_template.required_variables() + + def contains_optionals(self, variables=None): + if variables is None and len(self.optional_variables()) > 0: + return True + return len(self.optional_variables().intersection(variables)) > 0 + + def append_variables(self, variables): + variables.extend(self.sub_template.ordered_variables()) + + +class Template: + """ + Splits a template into its constituent parts """ - filled = fill_known(template, variables) - remaining = set(find_variables(filled)) - optional = optional_variables(filled) - res = set() - if glob_vars == 'all': - glob_vars = remaining - glob_vars = set(glob_vars).difference(variables.keys()) - - undefined_vars = remaining.difference(glob_vars).difference(optional) - if len(undefined_vars) > 0: - raise KeyError("Required variables {} were not defined".format(undefined_vars)) - - for keep in itertools.product(*[(True, False) for _ in optional.intersection(glob_vars)]): - sub_variables = {var: '*' for k, var in zip(keep, optional) if k} - for var in remaining.difference(optional).intersection(glob_vars): - sub_variables[var] = '*' - sub_filled = fill_known(filled, sub_variables) - - pattern = resolve_optionals(sub_filled) - assert len(find_variables(pattern)) == 0 - - for filename in glob.glob(pattern): - try: - extract_variables(filled, filename) - except ValueError: + def __init__(self, parts: Sequence[Part]): + self.parts = tuple(parts) + + @classmethod + def parse(cls, text: str) -> "Template": + """ + Parses a text template into its constituent parts + + :param text: input template as string + :return: same template split into its parts + """ + parts = [] + for optional_parts in re.split(r'(\[.*?\])', text): + if len(optional_parts) > 0 and optional_parts[0] == '[' and optional_parts[-1] == ']': + if '[' in optional_parts[1:-1] or ']' in optional_parts[1:-1]: + raise ValueError(f'Can not parse {text}, because unmatching square brackets were found') + parts.append(Optional(Template.parse(optional_parts[1:-1]))) + else: + for required_parts in re.split(r'(\{.*?\})', optional_parts): + if len(required_parts) > 0 and required_parts[0] == '{' and required_parts[-1] == '}': + if ':' in required_parts: + var_name, var_type = required_parts[1:-1].split(':') + else: + var_name, var_type = required_parts[1:-1], '' + parts.append(Required(var_name, var_type)) + else: + parts.append(Literal(required_parts)) + return Template(parts) + + def __str__(self): + """ + Returns the template as a string + """ + return ''.join([str(p) for p in self.parts]) + + def optional_variables(self, ) -> Set[str]: + """ + Set of optional variables + """ + if len(self.parts) == 0: + return set() + optionals = set.union(*[p.optional_variables() for p in self.parts]) + return optionals.difference(self.required_variables()) + + def required_variables(self, ) -> Set[str]: + """ + Set of required variables + """ + if len(self.parts) == 0: + return set() + return set.union(*[p.required_variables() for p in self.parts]) + + def ordered_variables(self, ) -> Tuple[str]: + """ + Sequence of all variables in order (can contain duplicates) + """ + ordered_vars = [] + for p in self.parts: + p.append_variables(ordered_vars) + return ordered_vars + + def fill_known(self, variables) -> "Template": + """ + Fill in the known variables + + Any optional parts, where all variables have been filled will be automatically replaced + """ + prev = '' + while str(self) != prev: + prev = str(self) + self = self._fill_known_single(variables) + return self + + def _fill_known_single(self, variables): + """ + Helper method for :meth:`_fill_known` + """ + res = [] + for p in self.parts: + res.extend(p.fill_known(variables)) + return Template(res) + + def remove_optionals(self, optionals=None) -> "Template": + """ + Removes any optionals containing the provided variables (default: remove all) + """ + return Template([p for p in self.parts if not p.contains_optionals(optionals)]) + + def resolve(self, variables) -> str: + """ + Resolves the template given a set of variables + + :param variables: mapping of variable names to values + :return: cleaned string + """ + clean_template = self.fill_known(variables).remove_optionals() + if len(clean_template.required_variables()) > 0: + raise KeyError("Variables %s not defined" % clean_template.required_variables()) + return str(clean_template) + + def get_all(self, variables, glob_vars=()) -> Tuple[Dict[str, str]]: + """ + Gets all variables for files on disk matching the templates + + :param variables: (incomplete) mapping of variable names to values + :param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk + """ + filled = self.fill_known(variables) + if glob_vars == 'all': + glob_vars = set.union(self.required_variables(), self.optional_variables()) + if len(filled.required_variables().difference(glob_vars)) > 0: + raise KeyError("Required variables {} were not defined".format( + filled.required_variables().difference(glob_vars) + )) + cleaned = filled.remove_optionals(filled.optional_variables().difference(glob_vars)) + return cleaned._get_all_helper(glob_vars) + + def _get_all_helper(self, glob_vars): + params = set() + optionals = self.optional_variables() + for to_fill in self.optional_subsets(): + pattern = str(to_fill.fill_known({var: '*' for var in glob_vars})) + while '//' in pattern: + pattern = pattern.replace('//', '/') + + for filename in sorted(glob.glob(pattern)): + try: + extracted_vars = to_fill.extract_variables(filename) + for name in optionals: + if name not in extracted_vars: + extracted_vars[name] = None + params.add(tuple(sorted(extracted_vars.items(), key=lambda item: item[0]))) + except ValueError: + pass + return tuple([dict(p) for p in params]) + + def optional_subsets(self, ) -> Iterator["Template"]: + """ + Yields template sub-sets with every combination optional variables + """ + optionals = self.optional_variables() + for n_optional in range(len(optionals) + 1): + for exclude_optional in itertools.combinations(optionals, n_optional): + yield self.remove_optionals(exclude_optional) + + def extract_variables(self, filename, known_vars=None): + """ + Extracts the variable values from the filename + + :param filename: filename + :param known_vars: already known variables + :return: dictionary from variable names to string representations (unused variables set to None) + """ + if known_vars is not None: + template = self.fill_known(known_vars) + else: + template = self + while '//' in filename: + filename = filename.replace('//', '/') + + required = template.required_variables() + optional = template.optional_variables() + results = [] + for to_fill in template.optional_subsets(): + sub_re = str(to_fill.fill_known( + {var: r'(\S+)' for var in required.union(optional)}, + )) + while '//' in sub_re: + sub_re = sub_re.replace('//', '/') + sub_re = sub_re.replace('.', r'\.') + match = re.match(sub_re, filename) + if match is None: continue - res.add(filename) - return sorted(res) + extracted_value = {} + ordered_vars = to_fill.ordered_variables() + assert len(ordered_vars) == len(match.groups()) + + failed = False + for var, value in zip(ordered_vars, match.groups()): + if var in extracted_value: + if value != extracted_value[var]: + failed = True + break + else: + extracted_value[var] = value + if failed or any('/' in value for value in extracted_value.values()): + continue + for name in template.optional_variables(): + if name not in extracted_value: + extracted_value[name] = None + if known_vars is not None: + extracted_value.update(known_vars) + results.append(extracted_value) + if len(results) == 0: + raise ValueError("{} did not match {}".format(filename, template)) -def fill_known(template, variables): + def score(variables): + """ + The highest score is given to the set of variables that: + + 1. has used the largest amount of optional variables + 2. has the shortest text within the variables (only used if equal at 1 + """ + number_used = len([v for v in variables.values() if v is not None]) + length_hint = sum([len(v) for v in variables.values() if v is not None]) + return number_used * 1000 - length_hint + + best = max(results, key=score) + for var in results: + if best != var and score(best) == score(var): + raise KeyError("Multiple equivalent ways found to parse {} using {}".format(filename, template)) + return best + + +def resolve(template, variables): """ - Fills in the known variables filling the other variables with {<variable_name>} + Resolves the template given a set of variables :param template: template - :param variables: mapping of variable names to values (ignoring any None) + :param variables: mapping of variable names to values :return: cleaned string """ - prev = '' - while prev != template: - prev = template - settings = {} - for name in set(find_variables(template)): - if name in variables and variables[name] is not None: - settings[name] = variables[name] - else: - settings[name] = '{' + name + '}' - template = template.format(**settings) - return template + return Template.parse(template).resolve(variables) -def resolve_optionals(text): +def get_all(template, variables, glob_vars=()): """ - Resolves the optional sections + Gets all variables matching the templates given the variables - :param text: template after filling in the known variables - :return: cleaned string + :param template: template + :param variables: (incomplete) mapping of variable names to values + :param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk + If `glob_vars` contains any defined variables, it will be ignored. + :return: sequence of variables """ - def resolve_single_optional(part): - if len(part) == 0: - return part - if part[0] != '[' or part[-1] != ']': - return part - elif len(find_variables(part)) == 0: - return part[1:-1] - else: - return '' - - res = [resolve_single_optional(text) for text in re.split(r'(\[.*?\])', text)] - return ''.join(res) + return Template.parse(template).get_all(variables, glob_vars) def find_variables(template): @@ -109,7 +377,7 @@ def find_variables(template): :param template: full template :return: sequence of variables """ - return tuple(var.split(':')[0] for var in re.findall(r"\{(.*?)\}", template)) + return Template.parse(template).ordered_variables() def optional_variables(template): @@ -119,17 +387,7 @@ def optional_variables(template): :param template: full template :return: set of variables that are only present in optional parts of the string """ - include = set() - exclude = set() - for text in re.split(r'(\[.*?\])', template): - if len(text) == 0: - continue - variables = find_variables(text) - if text[0] == '[' and text[-1] == ']': - include.update(variables) - else: - exclude.update(variables) - return include.difference(exclude) + return Template.parse(template).optional_variables() def extract_variables(template, filename, known_vars=None): @@ -141,41 +399,4 @@ def extract_variables(template, filename, known_vars=None): :param known_vars: already known variables :return: dictionary from variable names to string representations (unused variables set to None) """ - if known_vars is None: - known_vars = {} - template = fill_known(template, known_vars) - while '//' in filename: - filename = filename.replace('//', '/') - remaining = set(find_variables(template)) - optional = optional_variables(template) - for keep in itertools.product(*[(True, False) for _ in optional]): - sub_re = resolve_optionals(fill_known( - template, - dict( - **{var: r'(\S+)' for k, var in zip(keep, optional) if k}, - **{var: r'(\S+)' for var in remaining.difference(optional)} - ) - )) - while '//' in sub_re: - sub_re = sub_re.replace('//', '/') - sub_re = sub_re.replace('.', r'\.') - if re.match(sub_re, filename) is None: - continue - - extracted_value = {} - kept_vars = [var for var in find_variables(template) - if var not in optional or keep[list(optional).index(var)]] - for var, value in zip(kept_vars, re.match(sub_re, filename).groups()): - if var in extracted_value: - if value != extracted_value[var]: - raise ValueError('Multiple values found for {}'.format(var)) - else: - extracted_value[var] = value - if any('/' in value for value in extracted_value.values()): - continue - for name in find_variables(template): - if name not in extracted_value: - extracted_value[name] = None - extracted_value.update(known_vars) - return extracted_value - raise ValueError("{} did not match {}".format(filename, template)) + return Template.parse(template).extract_variables(filename, known_vars) diff --git a/tests/test_filetree/test_template.py b/tests/test_filetree/test_template.py index c4042078df8a762c3affb5b2d98e1c63cb9a4b81..ba51229214d1acc31adf7967d547a8036b9802e6 100644 --- a/tests/test_filetree/test_template.py +++ b/tests/test_filetree/test_template.py @@ -25,3 +25,12 @@ def test_get_variables(): assert {'subject': '01', 'session': 'A'} == utils.extract_variables('sub-{subject}/[ses-{session}]/T1w.nii.gz', 'sub-01/ses-A/T1w.nii.gz') with pytest.raises(ValueError): utils.extract_variables('sub-{subject}/[ses-{session}]/T1w.nii.gz', 'sub-01/other/T1w.nii.gz') + + +def test_multiple_optionals(): + with pytest.raises(KeyError): + utils.extract_variables('{var}[_{opt1}][_{opt2}]', 'test_foo') + assert {'var': 'test', 'opt1': None, 'opt2': None} == utils.extract_variables('{var}[_{opt1}][_{opt2}]', 'test') + assert {'var': 'test', 'opt1': 'oo', 'opt2': None} == utils.extract_variables('{var}[_f{opt1}][_{opt2}]', 'test_foo') + +