diff options
Diffstat (limited to 'bsfs/graph/resolve.py')
-rw-r--r-- | bsfs/graph/resolve.py | 47 |
1 files changed, 25 insertions, 22 deletions
diff --git a/bsfs/graph/resolve.py b/bsfs/graph/resolve.py index feb0855..00b778b 100644 --- a/bsfs/graph/resolve.py +++ b/bsfs/graph/resolve.py @@ -37,8 +37,6 @@ class Filter(): """ - T_VERTEX = typing.Union[bsc.Node, bsc.Literal] - def __init__(self, schema): self.schema = schema @@ -47,7 +45,7 @@ class Filter(): def _parse_filter_expression( self, - type_: T_VERTEX, + type_: bsc.Vertex, node: ast.filter.FilterExpression, ) -> ast.filter.FilterExpression: """Route *node* to the handler of the respective FilterExpression subclass.""" @@ -65,6 +63,8 @@ class Filter(): return self._and(type_, node) if isinstance(node, ast.filter.Or): return self._or(type_, node) + if isinstance(node, ast.filter.Distance): + return self._distance(type_, node) if isinstance(node, (ast.filter.Equals, ast.filter.Substring, \ ast.filter.StartsWith, ast.filter.EndsWith)): return self._value(type_, node) @@ -73,7 +73,7 @@ class Filter(): # invalid node raise errors.BackendError(f'expected filter expression, found {node}') - def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> T_VERTEX: + def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> bsc.Vertex: """Route *node* to the handler of the respective PredicateExpression subclass.""" if isinstance(node, ast.filter.Predicate): return self._predicate(node) @@ -82,7 +82,7 @@ class Filter(): # invalid node raise errors.BackendError(f'expected predicate expression, found {node}') - def _predicate(self, node: ast.filter.Predicate) -> T_VERTEX: + def _predicate(self, node: ast.filter.Predicate) -> bsc.Vertex: if not self.schema.has_predicate(node.predicate): raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema') pred = self.schema.predicate(node.predicate) @@ -91,49 +91,52 @@ class Filter(): dom, rng = rng, dom return rng - def _one_of(self, node: ast.filter.OneOf) -> T_VERTEX: + def _one_of(self, node: ast.filter.OneOf) -> bsc.Vertex: # determine domain and range types rng = None for pred in node: # parse child expression subrng = self._parse_predicate_expression(pred) # determine the next type - try: - if rng is None or subrng > rng: # pick most generic range - rng = subrng - except TypeError as err: - raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related') from err - if rng is None: - raise errors.UnreachableError() + if rng is None or subrng > rng: # pick most generic range + rng = subrng + # check range consistency + if not subrng <= rng and not subrng >= rng: + raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related') + if not isinstance(rng, (bsc.Node, bsc.Literal)): + raise errors.BackendError(f'the range of node {node} is undefined') return rng - def _any(self, type_: T_VERTEX, node: ast.filter.Any) -> ast.filter.Any: # pylint: disable=unused-argument + def _any(self, type_: bsc.Vertex, node: ast.filter.Any) -> ast.filter.Any: # pylint: disable=unused-argument next_type = self._parse_predicate_expression(node.predicate) return ast.filter.Any(node.predicate, self._parse_filter_expression(next_type, node.expr)) - def _all(self, type_: T_VERTEX, node: ast.filter.All) -> ast.filter.All: # pylint: disable=unused-argument + def _all(self, type_: bsc.Vertex, node: ast.filter.All) -> ast.filter.All: # pylint: disable=unused-argument next_type = self._parse_predicate_expression(node.predicate) return ast.filter.All(node.predicate, self._parse_filter_expression(next_type, node.expr)) - def _and(self, type_: T_VERTEX, node: ast.filter.And) -> ast.filter.And: + def _and(self, type_: bsc.Vertex, node: ast.filter.And) -> ast.filter.And: return ast.filter.And({self._parse_filter_expression(type_, expr) for expr in node}) - def _or(self, type_: T_VERTEX, node: ast.filter.Or) -> ast.filter.Or: + def _or(self, type_: bsc.Vertex, node: ast.filter.Or) -> ast.filter.Or: return ast.filter.Or({self._parse_filter_expression(type_, expr) for expr in node}) - def _not(self, type_: T_VERTEX, node: ast.filter.Not) -> ast.filter.Not: + def _not(self, type_: bsc.Vertex, node: ast.filter.Not) -> ast.filter.Not: return ast.filter.Not(self._parse_filter_expression(type_, node.expr)) - def _has(self, type_: T_VERTEX, node: ast.filter.Has) -> ast.filter.Has: # pylint: disable=unused-argument + def _has(self, type_: bsc.Vertex, node: ast.filter.Has) -> ast.filter.Has: # pylint: disable=unused-argument + return node + + def _distance(self, type_: bsc.Vertex, node: ast.filter.Distance): # pylint: disable=unused-argument return node - def _value(self, type_: T_VERTEX, node: ast.filter._Value) -> ast.filter._Value: # pylint: disable=unused-argument + def _value(self, type_: bsc.Vertex, node: ast.filter._Value) -> ast.filter._Value: # pylint: disable=unused-argument return node - def _bounded(self, type_: T_VERTEX, node: ast.filter._Bounded) -> ast.filter._Bounded: # pylint: disable=unused-argument + def _bounded(self, type_: bsc.Vertex, node: ast.filter._Bounded) -> ast.filter._Bounded: # pylint: disable=unused-argument return node - def _is(self, type_: T_VERTEX, node: ast.filter.Is) -> typing.Union[ast.filter.Or, ast.filter.Is]: + def _is(self, type_: bsc.Vertex, node: ast.filter.Is) -> typing.Union[ast.filter.Or, ast.filter.Is]: # check if action is needed if not isinstance(node.value, nodes.Nodes): return node |