# imports import typing # bsfs imports from bsfs import schema as bsc from bsfs.query import ast from bsfs.utils import errors # inner-module imports from . import nodes # exports __all__: typing.Sequence[str] = ( 'Filter', ) ## code ## class Filter(): """Rewrites the query to replace `bsfs.graph.nodes.Nodes` instances with the respective URI. Does only limited type checking and schema validation. Use `bsfs.schema.validate.Filter` to do so. Example: input: Any(ns.bse.tag, Is(Nodes(...))) output: Any(ns.bse.tag, Or(Is(...), Is(...), ...))) >>> tags = graph.node(ns.bsfs.Tag, 'http://example.com/me/tag#1234') >>> graph.get(ns.bsfs.Entity, ast.filter.Any(ns.bse.tag, ast.filter.Is(tags))) """ def __init__(self, schema): self.schema = schema def __call__( self, root_type: bsc.Node, node: typing.Optional[ast.filter.FilterExpression], ): if node is None: return None return self._parse_filter_expression(root_type, node) def _parse_filter_expression( self, type_: bsc.Vertex, node: ast.filter.FilterExpression, ) -> ast.filter.FilterExpression: """Route *node* to the handler of the respective FilterExpression subclass.""" if isinstance(node, ast.filter.Is): return self._is(type_, node) if isinstance(node, ast.filter.Not): return self._not(type_, node) if isinstance(node, ast.filter.Has): return self._has(type_, node) if isinstance(node, ast.filter.Any): return self._any(type_, node) if isinstance(node, ast.filter.All): return self._all(type_, node) if isinstance(node, ast.filter.And): return self._and(type_, node) if isinstance(node, ast.filter.Or): return self._or(type_, node) if isinstance(node, ast.filter.Distance): return self._distance(type_, node) if isinstance(node, (ast.filter.Equals, ast.filter.Substring, \ ast.filter.StartsWith, ast.filter.EndsWith)): return self._value(type_, node) if isinstance(node, (ast.filter.LessThan, ast.filter.GreaterThan)): return self._bounded(type_, node) # invalid node raise errors.BackendError(f'expected filter expression, found {node}') def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> bsc.Vertex: """Route *node* to the handler of the respective PredicateExpression subclass.""" if isinstance(node, ast.filter.Predicate): return self._predicate(node) if isinstance(node, ast.filter.OneOf): return self._one_of(node) # invalid node raise errors.BackendError(f'expected predicate expression, found {node}') def _predicate(self, node: ast.filter.Predicate) -> bsc.Vertex: if not self.schema.has_predicate(node.predicate): raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema') pred = self.schema.predicate(node.predicate) dom, rng = pred.domain, pred.range if node.reverse: dom, rng = rng, dom return rng def _one_of(self, node: ast.filter.OneOf) -> bsc.Vertex: # determine domain and range types rng = None for pred in node: # parse child expression subrng = self._parse_predicate_expression(pred) # determine the next type if rng is None or subrng > rng: # pick most generic range rng = subrng # check range consistency if not subrng <= rng and not subrng >= rng: raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related') if not isinstance(rng, (bsc.Node, bsc.Literal)): raise errors.BackendError(f'the range of node {node} is undefined') return rng def _any(self, type_: bsc.Vertex, node: ast.filter.Any) -> ast.filter.Any: # pylint: disable=unused-argument next_type = self._parse_predicate_expression(node.predicate) return ast.filter.Any(node.predicate, self._parse_filter_expression(next_type, node.expr)) def _all(self, type_: bsc.Vertex, node: ast.filter.All) -> ast.filter.All: # pylint: disable=unused-argument next_type = self._parse_predicate_expression(node.predicate) return ast.filter.All(node.predicate, self._parse_filter_expression(next_type, node.expr)) def _and(self, type_: bsc.Vertex, node: ast.filter.And) -> ast.filter.And: return ast.filter.And({self._parse_filter_expression(type_, expr) for expr in node}) def _or(self, type_: bsc.Vertex, node: ast.filter.Or) -> ast.filter.Or: return ast.filter.Or({self._parse_filter_expression(type_, expr) for expr in node}) def _not(self, type_: bsc.Vertex, node: ast.filter.Not) -> ast.filter.Not: return ast.filter.Not(self._parse_filter_expression(type_, node.expr)) def _has(self, type_: bsc.Vertex, node: ast.filter.Has) -> ast.filter.Has: # pylint: disable=unused-argument return node def _distance(self, type_: bsc.Vertex, node: ast.filter.Distance): # pylint: disable=unused-argument return node def _value(self, type_: bsc.Vertex, node: ast.filter._Value) -> ast.filter._Value: # pylint: disable=unused-argument return node def _bounded(self, type_: bsc.Vertex, node: ast.filter._Bounded) -> ast.filter._Bounded: # pylint: disable=unused-argument return node def _is(self, type_: bsc.Vertex, node: ast.filter.Is) -> typing.Union[ast.filter.Or, ast.filter.Is]: # check if action is needed if not isinstance(node.value, nodes.Nodes): return node # check schema consistency if node.value.node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'node {node.value.node_type} is not in the schema') # check type compatibility if not isinstance(type_, bsc.Node): raise errors.ConsistencyError(f'expected a node, found {type_}') if not node.value.node_type <= type_: raise errors.ConsistencyError(f'expected type {type_} or subtype thereof, found {node.value.node_type}') # NOTE: We assume that the node type is checked when writing to the backend. # Links to any of the guids can therefore only exist if the type matches. # Hence, we don't add a type check/constrain here. return ast.filter.Or(ast.filter.Is(guid) for guid in node.value.guids) # optimized code, removing unnecessary ast.filter.Or #guids = set(node.value.guids) #if len(guids) == 0: # raise errors.BackendError(f'') #if len(guids) == 1: # return ast.filter.Nodeid(next(iter(guids))) #return ast.filter.Or(ast.filter.Is(guid) for guid in guids) ## EOF ##