aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/graph/walk.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/graph/walk.py')
-rw-r--r--bsfs/graph/walk.py115
1 files changed, 115 insertions, 0 deletions
diff --git a/bsfs/graph/walk.py b/bsfs/graph/walk.py
new file mode 100644
index 0000000..6415c9b
--- /dev/null
+++ b/bsfs/graph/walk.py
@@ -0,0 +1,115 @@
+
+# imports
+from collections import abc
+import typing
+
+# bsfs imports
+from bsfs import schema as bsc
+
+# inner-module imports
+# NOTE: circular import! OK as long as only used for type annotations.
+from . import nodes # pylint: disable=cyclic-import
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Walk',
+ )
+
+
+## code ##
+
+class Walk(abc.Hashable, abc.Callable): # type: ignore [misc] # invalid base class (Callable)
+ """Syntactic sugar for `Nodes` to build and act on predicate paths via members."""
+
+ # Link to Nodes instance.
+ _root: 'nodes.Nodes'
+
+ # Current predicate path.
+ _path: typing.Tuple[bsc.Predicate, ...]
+
+ def __init__(
+ self,
+ root: 'nodes.Nodes',
+ path: typing.Sequence[bsc.Predicate],
+ ):
+ self._root = root
+ self._path = tuple(path)
+
+ @property
+ def tail(self):
+ """Return the node type at the end of the path."""
+ return self._path[-1].range
+
+
+ ## comparison
+
+ def __hash__(self) -> int:
+ """Return an integer hash that identifies the instance."""
+ return hash((type(self), self._root, self._path))
+
+ def __eq__(self, other) -> bool:
+ """Compare against *other* backend."""
+ return isinstance(other, type(self)) \
+ and self._root == other._root \
+ and self._path == other._path
+
+
+ ## representation
+
+ def __repr__(self) -> str:
+ """Return a formal string representation."""
+ path = ', '.join(pred.uri for pred in self._path)
+ return f'Walk({self._root.node_type.uri}, ({path}))'
+
+ def __str__(self) -> str:
+ """Return an informal string representation."""
+ path = ', '.join(pred.uri for pred in self._path)
+ return f'Walk(@{self._root.node_type.uri}: {path})'
+
+
+ ## walk
+
+ @staticmethod
+ def step(
+ schema: bsc.Schema,
+ node: bsc.Node,
+ name: str,
+ ) -> typing.Tuple[bsc.Predicate]:
+ """Get an predicate at *node* whose fragment matches *name*."""
+ predicates = tuple(
+ pred
+ for pred
+ in schema.predicates_at(node)
+ if pred.uri.get('fragment', None) == name
+ )
+ if len(predicates) == 0: # no fragment found for name
+ raise ValueError(f'no available predicate matches {name}') # FIXME: Custom exception
+ if len(predicates) > 1: # ambiguous name
+ raise ValueError(f'{name} matches multiple predicates') # FIXME: Custom exception
+ # append predicate to walk
+ return predicates # type: ignore [return-value] # size is one
+
+ def __getattr__(self, name: str) -> 'Walk':
+ """Alias for `Walk.step(name)`."""
+ try:
+ return super().__getattr__(name)
+ except AttributeError:
+ pass
+ # get predicate
+ pred = self.step(self._root.schema, self.tail, name)
+ # append predicate to walk
+ return Walk(self._root, self._path + pred)
+
+
+ ## get paths ##
+
+ def get(self, **kwargs) -> typing.Any:
+ """Alias for `Nodes.get(..)`."""
+ return self._root.get(tuple(pred.uri for pred in self._path), **kwargs)
+
+ def __call__(self, **kwargs) -> typing.Any: # pylint: disable=arguments-differ
+ """Alias for `Walk.get(...)`."""
+ return self.get(**kwargs)
+
+
+## EOF ##