""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import typing # bsfs imports from bsfs import schema as bsc # inner-module imports from . import ast # exports __all__ : typing.Sequence[str] = ( 'Filter', ) ## code ## class Filter(): # schema to validate against. schema: bsc.Schema def __init__(self, schema: bsc.Schema): self.schema = schema def parse(self, node: ast.filter.FilterExpression, subject: bsc.types._Vertex): # subject is a node type if not isinstance(subject, bsc.Node): raise errors.ConsistencyError(f'Expected a node, found {subject}') # subject exists in the schema if subject not in self.schema.nodes: raise errors.ConsistencyError(f'Invalid node type {subject}') # root expression is valid self._parse(node, subject) # all tests passed return True def _parse_numerical_expression(self, node: ast.filter.FilterExpression, subject: bsc.types._Vertex): if isinstance(node, ast.filter.And): return self._and(node, subject) elif isinstance(node, ast.filter.Or): return self._or(node, subject) elif isinstance(node, ast.filter.LessThan): return self._lessThan(node, subject) elif isinstance(node, ast.filter.GreaterThan): return self._greaterThan(node, subject) elif isinstance(node, ast.filter.Equals): return self._equals(node, subject, numerical=True) else: raise errors.ConsistencyError(f'Expected a numerical expression, found {node}') def __branch(self, node: typing.Union[ast.filter.Any, ast.filter.And], subject: bsc.types._Vertex): # subject is a node type if not isinstance(subject, bsc.Node): raise errors.ConsistencyError(f'Expected a node, found {subject}') # subject exists in the schema if subject not in self.schema.nodes: raise errors.ConsistencyError(f'Invalid node type {subject}') # predicate is valid dom, rng = self._parse_predicate_expression(node.predicate) # subject is a subtype of the predicate's domain if not subject <= dom: raise errors.ConsistencyError(f'Expected type {dom}, found {subject}') # child expression is valid self._parse_filter_expression(node.expr, rng) def _any(self, node: ast.filter.Any, subject: bsc.types._Vertex): return self.__branch(node, subject) def _all(self, node: ast.filter.All, subject: bsc.types._Vertex): return self.__branch(node, subject) def __agg(self, node: typing.Union[ast.filter.And, ast.filter.Or], subject: bsc.types._Vertex): for expr in node: # child expression is valid self._parse_filter_expression(expr, subject) def _and(self, node: ast.filter.And, subject: bsc.types._Vertex): return self.__agg(node, subject) def _or(self, node: ast.filter.Or, subject: bsc.types._Vertex): return self.__agg(node, subject) def _not(self, node: ast.filter.Not, subject: bsc.types._Vertex): # child expression is valid self._parse_filter_expression(node.expr, subject) def _has(self, node: ast.filter.Has, subject: bsc.types._Vertex): # subject is a node type if not isinstance(subject, bsc.Node): raise errors.ConsistencyError(f'Expected a node, found {subject}') # subject exists in the schema if subject not in self.schema.nodes: raise errors.ConsistencyError(f'Invalid node type {subject}') # predicate is valid dom, rng = self._parse_predicate_expression(node.predicate) # subject is a subtype of the predicate's domain if not subject <= dom: raise errors.ConsistencyError(f'Expected type {dom}, found {subject}') # node.count is a numerical expression self._parse_numerical_expression(node.count, self.schema.literal(ns.xsd.numerical)) def _equals(self, node: ast.filter.Equals, subject: bsc.types._Vertex, numerical: bool = False): # subject is a literal #if not isinstance(subject, bsc.Literal): # raise errors.ConsistencyError(f'Expected a literal, found {subject}') if isinstance(subject, bsc.Node): # FIXME: How to handle this case? # FIXME: How to check if a NodeType is acceptable? # FIXME: Maybe use flags to control what is expected as node identifiers? from bsfs.graph.nodes import Nodes # FIXME if not isinstance(node.value, Nodes) and not isinstance(node.value, URI): raise errors.ConsistencyError(f'Expected a Nodes or URI, found {node.value}') elif isinstance(subject, bsc.Literal): # literal exists in the schema if subject not in self.schema.literals: raise errors.ConsistencyError(f'Invalid literal type {subject}') else: # FIXME: raise errors.ConsistencyError(f'Expected a literal, found {subject}') # node.value is numeric (if requested) if numerical and not isinstance(node.value, float) and not isinstance(node.value, int): raise errors.ConsistencyError(f'Expected a numerical value (int or float), found {node.value}') # NOTE: We cannot check if node.value agrees with the subject since we don't know # all literal types, their hierarchy, and how the backend converts datatypes. def _substring(self, node: ast.filter.Substring, subject: bsc.types._Vertex): # subject is a literal if not isinstance(subject, bsc.Literal): raise errors.ConsistencyError(f'Expected a literal, found {subject}') # literal exists in the schema if subject not in self.schema.literals: raise errors.ConsistencyError(f'Invalid literal type {subject}') # node.value matches literal datatype if not subject.is_a(ns.xsd.string): raise errors.ConsistencyError(f'Expected a string literal, found {subject}') def _lessThan(self, node: ast.filter.LessThan, subject: bsc.types._Vertex): # subject is a literal if not isinstance(subject, bsc.Literal): raise errors.ConsistencyError(f'Expected a literal, found {subject}') # literal exists in the schema if subject not in self.schema.literals: raise errors.ConsistencyError(f'Invalid literal type {subject}') # subject is numerical if not subject.is_a(ns.xsd.numerical): raise errors.ConsistencyError(f'Expected a numerical literal, found {subject}') def _greaterThan(self, node: ast.filter.GreaterThan, subject: bsc.types._Vertex): # subject is a literal if not isinstance(subject, bsc.Literal): raise errors.ConsistencyError(f'Expected a literal, found {subject}') # literal exists in the schema if subject not in self.schema.literals: raise errors.ConsistencyError(f'Invalid literal type {subject}') # subject is numerical if not subject.is_a(ns.xsd.numerical): raise errors.ConsistencyError(f'Expected a numerical literal, found {subject}') def _predicate(self, node: ast.filter.Predicate): try: # predicate exists in the schema pred = self.schema.predicate(node.predicate) except KeyError: raise errors.ConsistencyError(f'') # FIXME if node.reverse: return pred.range, pred.domain else: return pred.domain, pred.range def _oneOf(self, node: ast.filter.OneOf): dom, rng = None, None for pred in node: try: # parse child expression subdom, subrng = self._parse_predicate_expression(pred) # domain and range must be related across all child expressions if not subdom <= dom and not subdom >= dom: raise errors.ConsistencyError(f'') # FIXME if not subrng <= rng and not subrng >= rng: raise errors.ConsistencyError(f'') # FIXME # determine overall domain and range if dom is None or subdom < dom: # pick most specific domain dom = subdom if rng is None or subrng > rng: # pick most generic range rng = subrng except KeyError: raise errors.ConsistencyError(f'') return dom, rng ## EOF ##