Newer
Older
from typing import Tuple, Optional, Dict, Any, Set
import os.path as op
from . import utils
class MissingVariable(KeyError):
"""
Returned when the variables of a tree or its parents do not contain a given variable
"""
pass
class FileTree(object):
"""
Contains the input/output filename tree
- ``templates``: dictionary mapping short names to filename templates
- ``variables``: dictionary mapping variables in the templates to specific values (variables set to None are explicitly unset)
- ``sub_trees``: filename trees describing specific sub-directories
- ``parent``: parent FileTree, of which this sub-tree is a sub-directory
- ``name``: descriptive name of the tree
templates: Dict[str, str],
variables: Dict[str, Any],
sub_trees: Dict[str, "FileTree"] = None,
parent: Optional["FileTree"] = None,
name: str = None):
"""
Creates a new filename tree.
"""
self.templates = templates
self.variables = variables
if sub_trees is None:
sub_trees = {}
self.sub_trees = sub_trees
self._parent = parent
"""
Parent FileTree, of which this sub-tree is a sub-directory
"""
@property
def name(self, ):
"""
Name of this ``FileTree``, or ``None`` if it has no name.
"""
return self._name
All tree variables including those inherited from the parent tree
return dict(self.variables)
def get_variable(self, name: str, default=None) -> str:
"""
Gets a variable used to fill out the template
:param name: variable name
:param default: default variables (if not set a MissingVariable error is raised if a variable is missing)
:return: value of the variable
"""
variables = self.all_variables
if name in variables and variables[name] is not None:
if default is None:
raise MissingVariable('Variable {} not found in sub-tree or parents'.format(name))
return default
def _get_template_tree(self, short_name: str) -> Tuple["FileTree", str]:
"""
Retrieves template text from this tree, parent tree or sub_tree
:param short_name: filename reference name
:return: tuple with the containing tree and the template text
"""
if '/' in short_name:
sub_tree, sub_name = short_name.split('/', maxsplit=1)
if sub_tree == '..':
if self.parent is None:
raise KeyError("Tried to access the parent of the top-level tree")
return self.parent._get_template_tree(sub_name)
return self.sub_trees[sub_tree]._get_template_tree(sub_name)
return self, self.templates[short_name]
def get_template(self, short_name: str) -> Tuple[str, Dict[str, str]]:
"""
Returns the sub-tree that defines a given short name
- '/' characters in short_name refer to sub-trees
- '../' characters in short_name refer to parents
For example:
- "eddy/output" refers to the "output" in the "eddy" sub_tree (i.e. ``self.sub_trees['eddy'].templates['output']``)
- "../other/name" refers to the "other" sub-tree of the parent tree (i.e., ``self.parent.sub_trees['other'].templates['name']``)
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
:param short_name: name of the template
:return: tuple with the template and the variables corresponding to the template
"""
tree, text = self._get_template_tree(short_name)
return text, tree.all_variables
def template_variables(self, short_name: Optional[str]=None, optional=True, required=True) -> Set[str]:
"""
Returns the variables needed to define a template
:param short_name: name of the template (defaults to all)
:param optional: if set to False don't include the optional variables
:param required: if set to False don't include the required variables
:return: set of variable names
"""
if not optional and not required:
return set()
if short_name is None:
all_vars = set()
required_vars = set()
for short_name in self.templates.keys():
all_vars.update(self.template_variables(short_name))
if required or optional:
required_vars.update(self.template_variables(short_name, optional=False))
for sub_tree in self.sub_trees.values():
all_vars.update(sub_tree.template_variables())
if required or optional:
required_vars.update(sub_tree.template_variables(optional=False))
if optional and required:
return all_vars
if required:
return required_vars
if optional:
return all_vars.difference(required_vars)
else:
_, text = self._get_template_tree(short_name)
all_vars = set(utils.find_variables(text))
if optional and required:
return all_vars
opt_vars = set(utils.optional_variables(text))
if optional:
return opt_vars
if required:
return all_vars.difference(opt_vars)
def get(self, short_name, make_dir=False) -> str:
"""
Gets a full filename based on its short name
:param short_name: identifier in the tree
:param make_dir: if True make sure that the directory leading to this file exists
:return: full filename
"""
text, variables = self.get_template(short_name)
res = Path(utils.resolve(text, variables))
if make_dir:
res.parents[0].mkdir(parents=True, exist_ok=True)
return str(res)
def get_all(self, short_name: str, glob_vars=()) -> Tuple[str]:
"""
Gets all existing directory/file names matching a specific pattern
:param short_name: short name of the path template
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk.
Any defined variables in `glob_vars` will be ignored.
If glob_vars is set to 'all', all undefined variables will be used to look up matches.
return tuple([self.update(**vars).get(short_name)
for vars in self.get_all_vars(short_name, glob_vars=glob_vars)])
def get_all_vars(self, short_name: str, glob_vars=()) -> Tuple[Dict[str, str]]:
"""
Gets all the parameters that generate existing filenames
:param short_name: short name of the path template
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk.
Any defined variables in `glob_vars` will be ignored.
If glob_vars is set to 'all', all undefined variables will be used to look up matches.
:return: sequence of dictionaries with the variables settings used to generate each filename
"""
text, variables = self.get_template(short_name)
return utils.get_all(text, variables, glob_vars=glob_vars)
def get_all_trees(self, short_name: str, glob_vars=(), set_parent=True) -> Tuple["FileTree"]:
"""
Gets all the trees that generate the existing files matching the pattern
tree.get_all(short_name) == tuple(tree.get(short_name) for tree in tree.get_all_trees(short_name))
:param short_name: short name of the path template
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk.
Any defined variables in `glob_vars` will be ignored.
If glob_vars is set to 'all', all undefined variables will be used to look up matches.
:param set_parent: Update the variables of the top-level rather than current tree if True.
Ony relevant if `self` is a sub-tree.
:return: sequence of FileTrees used to generate each file on disk matching the pattern of `short_name`
"""
return tuple(self.update(set_parent=set_parent, **vars)
for vars in self.get_all_vars(short_name, glob_vars=glob_vars))
def update(self, set_parent=True, **variables) -> "FileTree":
Creates a new FileTree with updated variables
:param set_parent: Update the variables of the top-level rather than current tree if True.
Ony relevant if `self` is a sub-tree.
:param variables: new values for the variables
Setting a variable to None will cause the variable to be unset
:return: New FileTree with same templates for directory names and filenames, but updated variables
set_tree = new_tree
while set_parent and set_tree.parent is not None:
set_tree = set_tree.parent
set_tree.variables.update(variables)
for key, value in variables.items():
if value is None:
del set_tree.variables[key]
return new_tree
def extract_variables(self, short_name: str, filename: str) -> Dict[str, str]:
"""
Extracts the variables from the given filename
:param short_name: short name of the path template
:param filename: filename matching the template
:return: variables needed to get to the given filename
Variables with None value are optional variables in the template that were not used
"""
text, _ = self.get_template(short_name)
return utils.extract_variables(text, filename, self.variables)
def save_pickle(self, filename):
"""
Saves the Filetree to a pickle file
:param filename: filename to store the file tree (usually ending with .pck)
"""
with open(filename, 'wb') as f:
pickle.dump(self, f)
def save_json(self, filename):
"""
Saves the Filetree to a JSON file
:param filename: filename to store the file tree in
"""
def default(obj):
if isinstance(obj, FileTree):
res = dict(obj.__dict__)
del res['_parent']
return res
return obj
with open(filename, 'w') as f:
json.dump(self, f, default=default, indent=2)
@classmethod
def load_pickle(cls, filename):
"""
Loads the Filetree from a pickle file
:param filename: filename produced from Filetree.save_pickle
:return: stored Filetree
"""
with open(filename, 'rb') as f:
res = pickle.load(f)
if not isinstance(res, cls):
raise IOError("Pickle file did not contain %s object" % cls)
return res
@classmethod
def load_json(cls, filename):
"""
Loads the FileTree from a JSON file
:param filename: filename produced by FileTree.save_json
:return: stored FileTree
"""
def from_dict(input_dict):
res_tree = FileTree(
templates=input_dict['templates'],
variables=input_dict['variables'],
sub_trees={name: from_dict(value) for name, value in input_dict['sub_trees'].items()},
name=input_dict['_name'],
)
for sub_tree in res_tree.sub_trees.values():
sub_tree._parent = res_tree
return res_tree
with open(filename, 'r') as f:
as_dict = json.load(f)
return from_dict(as_dict)
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def defines(self, short_names, error=False):
"""
Checks whether templates are defined for all the `short_names`
:param short_names: sequence of expected short names to exist in the tree
:param error: if True raises ValueError if any `short_names` are undefined
:return: True if all are defined, False otherwise
:raise: ValueError if `error` is set to True and any template is missing
"""
if isinstance(short_names, str):
short_names = (short_names, )
def single_test(short_name):
try:
self._get_template_tree(short_name)
except KeyError:
return True
return False
missing = tuple(name for name in short_names if single_test(name))
if len(missing) > 0:
if error:
raise ValueError("Provided Filetree is missing template definitions for {}".format(missing))
return False
return True
def on_disk(self, short_names, error=False, glob_vars=()):
Checks whether at least one file exists for every file in `short_names`
:param short_names: list of expected short names to exist in the tree
:param error: if True raises a helpful error when the check fails
:param glob_vars: sequence of undefined variables that can take any possible values when looking for matches on the disk
If `glob_vars` contains any defined variables, it will be ignored.
:return: True if short names exist and optionally exist on disk (False otherwise)
:raise:
- ValueError if error is set and the tree is incomplete
- IOError if error is set and any files are missing from the disk
self.defines(short_names, error=error)
if isinstance(short_names, str):
short_names = (short_names, )
try:
missing = tuple(name for name in short_names if len(self.get_all(name, glob_vars=glob_vars)) == 0)
except KeyError:
if error:
raise
return False
if len(missing) > 0:
if error:
raise IOError("Failed to find any files on disk for {}".format(missing))
def partial_fill(self, ) -> "FileTree":
"""
Fills in known variables into the templates
:return: The resulting tree will have empty `variables` dictionaries and updated templates
"""
to_update = new_tree
while to_update.parent is not None:
to_update = to_update.parent
to_update._update_partial_fill()
return new_tree
def _update_partial_fill(self, ):
"""
Helper function for `partial_fill` that updates the templates in place
"""
new_templates = {}
for short_name in self.templates:
template, variables = self.get_template(short_name)
new_templates[short_name] = str(utils.Template.parse(template).fill_known(variables))
self.templates = new_templates
for tree in self.sub_trees.values():
tree._update_partial_fill()
self.variables = {}
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def copy(self, ):
"""
Copies the FileTree
Copies the templates, variables, sub_trees, and parent
:return: a copy of the FileTree
"""
return self._copy()
def _copy(self, new_parent=None, new_sub_tree=None):
"""
Helper function for copying a FileTree
"""
if new_sub_tree is None:
new_sub_tree = (None, None)
new_copy = type(self)(
templates=self.templates.copy(),
variables=self.variables.copy(),
name=self.name,
parent=new_parent
)
new_copy.sub_trees = {name: new_sub_tree[1] if new_sub_tree[0] == name else tree._copy(new_parent=new_copy)
for name, tree in self.sub_trees.items()}
if self.parent is not None and new_parent is None:
for my_key, ref_tree in self.parent.sub_trees.items():
if self is ref_tree:
break
else:
raise ValueError(f"Sub-tree {self} not found in parent tree")
new_copy._parent = self.parent._copy(new_sub_tree=(my_key, new_copy))
return new_copy
def read(cls, tree_name: str, directory='.', partial_fill=False, **variables) -> "FileTree":
"""
Reads a FileTree from a specific file
The return type is ``cls`` unless the tree_name has been previously registered.
The return type of any sub-tree is ``FileTree`` unless the tree_name has been previously registered.
:param tree_name: file containing the filename tree.
Can provide the filename of a tree file or the name for a tree in the ``filetree.tree_directories``.
:param directory: parent directory of the full tree (defaults to current directory)
:param partial_fill: By default any known `variables` are filled into the `template` immediately
:param variables: variable settings
:return: dictionary from specifier to filename
"""
if op.exists(tree_name):
filename = tree_name
elif op.exists(tree_name + '.tree'):
filename = tree_name + '.tree'
else:
filename = parse.search_tree(tree_name)

Paul McCarthy
committed
tree_name = op.splitext(op.basename(filename))[0]
dirname = str(filename.parent)
templates = {}
nspaces_level = []
sub_trees = {}
file_variables = {}
with open(str(filename), 'r') as f:
for full_line in f:
# ignore anything behind the first #-character
line = full_line.split('#')[0]
if len(line.strip()) == 0:
continue
if line.strip()[:2] == '->':
nspaces = line.index('->')
if len(nspaces_level) == 0:
sub_dir = directory
elif nspaces > nspaces_level[-1]:
sub_dir = current_filename
elif nspaces < nspaces_level[-1]:
if nspaces not in nspaces_level:
raise ValueError('line %s dropped to a non-existent level' % line)
new_level = nspaces_level.index(nspaces)
current_filename = current_filename.parents[len(nspaces_level) - new_level - 1] / filename
nspaces_level = nspaces_level[:new_level + 1]
sub_dir = current_filename.parents[0]
else:
sub_dir = current_filename.parents[0]
with parse.extra_tree_dirs([dirname]):
_, sub_tree, short_name = parse.read_subtree_line(line, sub_dir)
if short_name in sub_trees:
raise ValueError("Name of sub_tree {short_name} used multiple times in {tree_name}.tree".format(**locals()))
sub_trees[short_name] = sub_tree

Paul McCarthy
committed
sub_tree._name = short_name
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
elif '=' in line:
key, value = line.split('=')
if len(key.split()) != 1:
raise ValueError("Variable assignment could not be parsed: {line}".format(**locals()))
file_variables[key.strip()] = value.strip()
else:
nspaces, filename, short_name = parse.read_line(line)
if short_name in templates:
raise ValueError("Name of directory/file {short_name} used multiple times in {tree_name}.tree".format(**locals()))
if len(nspaces_level) == 0:
current_filename = PurePath(directory) / filename
nspaces_level.append(nspaces)
elif nspaces > nspaces_level[-1]:
# increase the level
current_filename = current_filename / filename
nspaces_level.append(nspaces)
elif nspaces < nspaces_level[-1]:
# decreased the level
if nspaces not in nspaces_level:
raise ValueError('line %s dropped to a non-existent level' % full_line)
new_level = nspaces_level.index(nspaces)
current_filename = current_filename.parents[len(nspaces_level) - new_level - 1] / filename
nspaces_level = nspaces_level[:new_level + 1]
else:
current_filename = current_filename.parents[0] / filename
templates[short_name] = str(current_filename)
file_variables.update(variables)
res = get_registered(tree_name, cls)(templates, variables=file_variables, sub_trees=sub_trees, name=tree_name)
for tree in sub_trees.values():
tree._parent = res
if partial_fill:
res = res.partial_fill()
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
return res
_registered_subtypes = {}
def register_tree(name: str, tree_subtype: type):
"""
Registers a tree_subtype under name
Loading a tree with given name will lead to the `tree_subtype` rather than FileTree to be returned
:param name: name of tree filename
:param tree_subtype: tree subtype
"""
global _registered_subtypes
if not issubclass(tree_subtype, FileTree):
raise ValueError("Only sub-classes of FileTree can be registered")
_registered_subtypes[name] = tree_subtype
def get_registered(name, default=FileTree) -> type:
"""
Get the previously registered subtype for ``name``
:param name: name of the sub-tree
:param default: type to return if the name has not been registered
:return: FileTree or sub-type thereof
"""
if name in _registered_subtypes:
return _registered_subtypes[name]
name = op.split(name)[1]
if name in _registered_subtypes:
return _registered_subtypes[name]
while name.endswith('.tree'):
name = name[:-5]
if name in _registered_subtypes:
return _registered_subtypes[name]
return default