diff options
Diffstat (limited to 'bsfs/query/validator.py')
-rw-r--r-- | bsfs/query/validator.py | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/bsfs/query/validator.py b/bsfs/query/validator.py new file mode 100644 index 0000000..352203a --- /dev/null +++ b/bsfs/query/validator.py @@ -0,0 +1,224 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# bsfs imports +from bsfs import schema as bsc +from bsfs.namespace import ns +from bsfs.utils import errors, typename + +# inner-module imports +from . import ast + +# exports +__all__ : typing.Sequence[str] = ( + 'Filter', + ) + + +## code ## + +class Filter(): + """Validate a `bsfs.query.ast.filter` query's structure and schema compliance. + + * Conditions (Bounded, Value) can only be applied on literals + * Branches, Id, and Has can only be applied on nodes + * Predicates' domain and range must match + * Predicate paths must follow the schema + * Referenced types are present in the schema + + """ + + # vertex types + T_VERTEX = typing.Union[bsc.Node, bsc.Literal] # FIXME: Shouldn't this be in the schema? + + # schema to validate against. + schema: bsc.Schema + + def __init__(self, schema: bsc.Schema): + self.schema = schema + + def __call__(self, root_type: bsc.Node, query: ast.filter.FilterExpression): + """Validate a filter *query*, assuming the subject having *root_type*. + + Raises a `bsfs.utils.errors.ConsistencyError` if the query violates the schema. + Raises a `bsfs.utils.errors.BackendError` if the query structure is invalid. + + """ + # root_type must be a schema.Node + if not isinstance(root_type, bsc.Node): + raise TypeError(f'Expected a node, found {typename(root_type)}') + # root_type must exist in the schema + if root_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{root_type} is not defined in the schema') + # check root expression + self._parse_filter_expression(root_type, query) + # all tests passed + return True + + + ## routing methods + + def _parse_filter_expression(self, type_: T_VERTEX, node: ast.filter.FilterExpression): + """Route *node* to the handler of the respective FilterExpression subclass.""" + if isinstance(node, ast.filter.Is): + return self._is(type_, node) + if isinstance(node, ast.filter.Not): + return self._not(type_, node) + if isinstance(node, ast.filter.Has): + return self._has(type_, node) + if isinstance(node, (ast.filter.Any, ast.filter.All)): + return self._branch(type_, node) + if isinstance(node, (ast.filter.And, ast.filter.Or)): + return self._agg(type_, node) + if isinstance(node, (ast.filter.Equals, ast.filter.Substring, ast.filter.StartsWith, ast.filter.EndsWith)): + return self._value(type_, node) + if isinstance(node, (ast.filter.LessThan, ast.filter.GreaterThan)): + return self._bounded(type_, node) + # invalid node + raise errors.BackendError(f'expected filter expression, found {node}') + + def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> typing.Tuple[T_VERTEX, T_VERTEX]: + """Route *node* to the handler of the respective PredicateExpression subclass.""" + if isinstance(node, ast.filter.Predicate): + return self._predicate(node) + if isinstance(node, ast.filter.OneOf): + return self._one_of(node) + # invalid node + raise errors.BackendError(f'expected predicate expression, found {node}') + + + ## predicate expressions + + def _predicate(self, node: ast.filter.Predicate) -> typing.Tuple[T_VERTEX, T_VERTEX]: + # predicate exists in the schema + if not self.schema.has_predicate(node.predicate): + raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema') + # determine domain and range + pred = self.schema.predicate(node.predicate) + dom, rng = pred.domain, pred.range + if rng is None: + # FIXME: It is a design error that Predicates can have a None range... + raise errors.BackendError(f'predicate {pred} has no range') + if node.reverse: + dom, rng = rng, dom # type: ignore [assignment] # variable re-use confuses mypy + # return domain and range + return dom, rng + + def _one_of(self, node: ast.filter.OneOf) -> typing.Tuple[T_VERTEX, T_VERTEX]: + # determine domain and range types + # NOTE: select the most specific domain and the most generic range + dom, rng = None, None + for pred in node: + # parse child expression + subdom, subrng = self._parse_predicate_expression(pred) + try: + # determine overall domain + if dom is None or subdom < dom: # pick most specific domain + dom = subdom + # domains must be related across all child expressions + if not subdom <= dom and not subdom >= dom: + raise errors.ConsistencyError(f'domains {subdom} and {dom} are not related') + except TypeError as err: # compared literal vs. node + raise errors.ConsistencyError(f'domains {subdom} and {dom} are not of the same type') from err + + try: + # determine overall range + if rng is None or subrng > rng: # pick most generic range + rng = subrng + # ranges must be related across all child expressions + if not subrng <= rng and not subrng >= rng: + raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related') + except TypeError as err: # compared literal vs. node + raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not of the same type') from err + # check domain and range + if dom is None or rng is None: + # OneOf guarantees at least one expression, these two cases cannot happen + raise errors.UnreachableError() + # return domain and range + return dom, rng + + + ## intermediates + + def _branch(self, type_: T_VERTEX, node: ast.filter._Branch): + # type is a Node + if not isinstance(type_, bsc.Node): + raise errors.ConsistencyError(f'expected a Node, found {type_}') + # type exists in the schema + # FIXME: Isn't it actually guaranteed that the type (except the root type) is part of the schema? + # all types can be traced back to (a) root_type, (b) predicate, or (c) manually set (e.g. in _is). + # For (a), we do (and have to) perform a check. For (c), the code base should be consistent throughout + # the module, so this is an assumption that has to be ensured in schema.Schema. For (b), we know (and + # check) that the predicate is in the schema, hence all node/literals derived from it are also in the + # schema by construction of the schema.Schema class. So, why do we check this every time? + if type_ not in self.schema.nodes(): + raise errors.ConsistencyError(f'node {type_} is not in the schema') + # predicate is valid + dom, rng = self._parse_predicate_expression(node.predicate) + # type_ is a subtype of the predicate's domain + if not type_ <= dom: + raise errors.ConsistencyError(f'expected type {dom} or subtype thereof, found {type_}') + # child expression is valid + self._parse_filter_expression(rng, node.expr) + + def _agg(self, type_: T_VERTEX, node: ast.filter._Agg): + for expr in node: + # child expression is valid + self._parse_filter_expression(type_, expr) + + def _not(self, type_: T_VERTEX, node: ast.filter.Not): + # child expression is valid + self._parse_filter_expression(type_, node.expr) + + def _has(self, type_: T_VERTEX, node: ast.filter.Has): + # type is a Node + if not isinstance(type_, bsc.Node): + raise errors.ConsistencyError(f'expected a Node, found {type_}') + # type exists in the schema + if type_ not in self.schema.nodes(): + raise errors.ConsistencyError(f'node {type_} is not in the schema') + # predicate is valid + dom, _= self._parse_predicate_expression(node.predicate) + # type_ is a subtype of the predicate's domain + if not type_ <= dom: + raise errors.ConsistencyError(f'expected type {dom}, found {type_}') + # node.count is a numerical expression + # FIXME: We have to ensure that ns.xsd.integer is always known in the schema! + self._parse_filter_expression(self.schema.literal(ns.xsd.integer), node.count) + + + ## conditions + + def _is(self, type_: T_VERTEX, node: ast.filter.Is): # pylint: disable=unused-argument # (node) + if not isinstance(type_, bsc.Node): + raise errors.ConsistencyError(f'expected a Node, found {type_}') + if type_ not in self.schema.nodes(): + raise errors.ConsistencyError(f'node {type_} is not in the schema') + + def _value(self, type_: T_VERTEX, node: ast.filter._Value): # pylint: disable=unused-argument # (node) + # type is a literal + if not isinstance(type_, bsc.Literal): + raise errors.ConsistencyError(f'expected a Literal, found {type_}') + # type exists in the schema + if type_ not in self.schema.literals(): + raise errors.ConsistencyError(f'literal {type_} is not in the schema') + # FIXME: Check if node.value corresponds to type_ + # FIXME: A specific literal might be requested (i.e., a numeric type when used in Has) + + def _bounded(self, type_: T_VERTEX, node: ast.filter._Bounded): # pylint: disable=unused-argument # (node) + # type is a literal + if not isinstance(type_, bsc.Literal): + raise errors.ConsistencyError(f'expected a Literal, found {type_}') + # type exists in the schema + if type_ not in self.schema.literals(): + raise errors.ConsistencyError(f'literal {type_} is not in the schema') + # FIXME: Check if node.value corresponds to type_ + + +## EOF ## |