""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import typing # bsfs imports from bsfs import schema as bsc from bsfs.namespace import ns from bsfs.utils import errors, typename # inner-module imports from . import ast # exports __all__ : typing.Sequence[str] = ( 'Filter', ) ## code ## class Filter(): """Validate a `bsfs.query.ast.filter` query's structure and schema compliance. * Conditions (Bounded, Value) can only be applied on literals * Branches, Id, and Has can only be applied on nodes * Predicates' domain and range must match * Predicate paths must follow the schema * Referenced types are present in the schema """ # schema to validate against. schema: bsc.Schema def __init__(self, schema: bsc.Schema): self.schema = schema def __call__(self, root_type: bsc.Node, query: ast.filter.FilterExpression): """Validate a filter *query*, assuming the subject having *root_type*. Raises a `bsfs.utils.errors.ConsistencyError` if the query violates the schema. Raises a `bsfs.utils.errors.BackendError` if the query structure is invalid. """ # root_type must be a schema.Node if not isinstance(root_type, bsc.Node): raise TypeError(f'Expected a node, found {typename(root_type)}') # root_type must exist in the schema if root_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{root_type} is not defined in the schema') # check root expression self._parse_filter_expression(root_type, query) # all tests passed return True ## routing methods def _parse_filter_expression(self, type_: bsc.Vertex, node: ast.filter.FilterExpression): """Route *node* to the handler of the respective FilterExpression subclass.""" if isinstance(node, ast.filter.Is): return self._is(type_, node) if isinstance(node, ast.filter.Not): return self._not(type_, node) if isinstance(node, ast.filter.Has): return self._has(type_, node) if isinstance(node, (ast.filter.Any, ast.filter.All)): return self._branch(type_, node) if isinstance(node, (ast.filter.And, ast.filter.Or)): return self._agg(type_, node) if isinstance(node, (ast.filter.Equals, ast.filter.Substring, ast.filter.StartsWith, ast.filter.EndsWith)): return self._value(type_, node) if isinstance(node, (ast.filter.LessThan, ast.filter.GreaterThan)): return self._bounded(type_, node) # invalid node raise errors.BackendError(f'expected filter expression, found {node}') def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> typing.Tuple[bsc.Vertex, bsc.Vertex]: """Route *node* to the handler of the respective PredicateExpression subclass.""" if isinstance(node, ast.filter.Predicate): return self._predicate(node) if isinstance(node, ast.filter.OneOf): return self._one_of(node) # invalid node raise errors.BackendError(f'expected predicate expression, found {node}') ## predicate expressions def _predicate(self, node: ast.filter.Predicate) -> typing.Tuple[bsc.Vertex, bsc.Vertex]: # predicate exists in the schema if not self.schema.has_predicate(node.predicate): raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema') # determine domain and range pred = self.schema.predicate(node.predicate) dom, rng = pred.domain, pred.range if rng is None: # FIXME: It is a design error that Predicates can have a None range... raise errors.BackendError(f'predicate {pred} has no range') if node.reverse: dom, rng = rng, dom # type: ignore [assignment] # variable re-use confuses mypy # return domain and range return dom, rng def _one_of(self, node: ast.filter.OneOf) -> typing.Tuple[bsc.Vertex, bsc.Vertex]: # determine domain and range types # NOTE: select the most specific domain and the most generic range dom, rng = None, None for pred in node: # parse child expression subdom, subrng = self._parse_predicate_expression(pred) try: # determine overall domain if dom is None or subdom < dom: # pick most specific domain dom = subdom # domains must be related across all child expressions if not subdom <= dom and not subdom >= dom: raise errors.ConsistencyError(f'domains {subdom} and {dom} are not related') except TypeError as err: # compared literal vs. node raise errors.ConsistencyError(f'domains {subdom} and {dom} are not of the same type') from err try: # determine overall range if rng is None or subrng > rng: # pick most generic range rng = subrng # ranges must be related across all child expressions if not subrng <= rng and not subrng >= rng: raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related') except TypeError as err: # compared literal vs. node raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not of the same type') from err # check domain and range if dom is None or rng is None: # OneOf guarantees at least one expression, these two cases cannot happen raise errors.UnreachableError() # return domain and range return dom, rng ## intermediates def _branch(self, type_: bsc.Vertex, node: ast.filter._Branch): # type is a Node if not isinstance(type_, bsc.Node): raise errors.ConsistencyError(f'expected a Node, found {type_}') # type exists in the schema # FIXME: Isn't it actually guaranteed that the type (except the root type) is part of the schema? # all types can be traced back to (a) root_type, (b) predicate, or (c) manually set (e.g. in _is). # For (a), we do (and have to) perform a check. For (c), the code base should be consistent throughout # the module, so this is an assumption that has to be ensured in schema.Schema. For (b), we know (and # check) that the predicate is in the schema, hence all node/literals derived from it are also in the # schema by construction of the schema.Schema class. So, why do we check this every time? if type_ not in self.schema.nodes(): raise errors.ConsistencyError(f'node {type_} is not in the schema') # predicate is valid dom, rng = self._parse_predicate_expression(node.predicate) # type_ is a subtype of the predicate's domain if not type_ <= dom: raise errors.ConsistencyError(f'expected type {dom} or subtype thereof, found {type_}') # child expression is valid self._parse_filter_expression(rng, node.expr) def _agg(self, type_: bsc.Vertex, node: ast.filter._Agg): for expr in node: # child expression is valid self._parse_filter_expression(type_, expr) def _not(self, type_: bsc.Vertex, node: ast.filter.Not): # child expression is valid self._parse_filter_expression(type_, node.expr) def _has(self, type_: bsc.Vertex, node: ast.filter.Has): # type is a Node if not isinstance(type_, bsc.Node): raise errors.ConsistencyError(f'expected a Node, found {type_}') # type exists in the schema if type_ not in self.schema.nodes(): raise errors.ConsistencyError(f'node {type_} is not in the schema') # predicate is valid dom, _= self._parse_predicate_expression(node.predicate) # type_ is a subtype of the predicate's domain if not type_ <= dom: raise errors.ConsistencyError(f'expected type {dom}, found {type_}') # node.count is a numerical expression # FIXME: We have to ensure that ns.xsd.integer is always known in the schema! self._parse_filter_expression(self.schema.literal(ns.xsd.integer), node.count) ## conditions def _is(self, type_: bsc.Vertex, node: ast.filter.Is): # pylint: disable=unused-argument # (node) if not isinstance(type_, bsc.Node): raise errors.ConsistencyError(f'expected a Node, found {type_}') if type_ not in self.schema.nodes(): raise errors.ConsistencyError(f'node {type_} is not in the schema') def _value(self, type_: bsc.Vertex, node: ast.filter._Value): # pylint: disable=unused-argument # (node) # type is a literal if not isinstance(type_, bsc.Literal): raise errors.ConsistencyError(f'expected a Literal, found {type_}') # type exists in the schema if type_ not in self.schema.literals(): raise errors.ConsistencyError(f'literal {type_} is not in the schema') # FIXME: Check if node.value corresponds to type_ # FIXME: A specific literal might be requested (i.e., a numeric type when used in Has) def _bounded(self, type_: bsc.Vertex, node: ast.filter._Bounded): # pylint: disable=unused-argument # (node) # type is a literal if not isinstance(type_, bsc.Literal): raise errors.ConsistencyError(f'expected a Literal, found {type_}') # type exists in the schema if type_ not in self.schema.literals(): raise errors.ConsistencyError(f'literal {type_} is not in the schema') # FIXME: Check if node.value corresponds to type_ ## EOF ##