aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/query/validator.py
blob: 75b51cac8f632c7fb979de0233fcf5e90adbca55 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""

Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
import typing

# bsfs imports
from bsfs import schema as bsc
from bsfs.namespace import ns
from bsfs.utils import errors, typename

# inner-module imports
from . import ast

# exports
__all__ : typing.Sequence[str] = (
    'Filter',
    )


## code ##

class Filter():
    """Validate a `bsfs.query.ast.filter` query's structure and schema compliance.

    * Conditions (Bounded, Value) can only be applied on literals
    * Branches, Id, and Has can only be applied on nodes
    * Predicates' domain and range must match
    * Predicate paths must follow the schema
    * Referenced types are present in the schema

    """

    # schema to validate against.
    schema: bsc.Schema

    def __init__(self, schema: bsc.Schema):
        self.schema = schema

    def __call__(self, root_type: bsc.Node, query: ast.filter.FilterExpression):
        """Validate a filter *query*, assuming the subject having *root_type*.

        Raises a `bsfs.utils.errors.ConsistencyError` if the query violates the schema.
        Raises a `bsfs.utils.errors.BackendError` if the query structure is invalid.

        """
        # root_type must be a schema.Node
        if not isinstance(root_type, bsc.Node):
            raise TypeError(f'Expected a node, found {typename(root_type)}')
        # root_type must exist in the schema
        if root_type not in self.schema.nodes():
            raise errors.ConsistencyError(f'{root_type} is not defined in the schema')
        # check root expression
        self._parse_filter_expression(root_type, query)
        # all tests passed
        return True


    ## routing methods

    def _parse_filter_expression(self, type_: bsc.Vertex, node: ast.filter.FilterExpression):
        """Route *node* to the handler of the respective FilterExpression subclass."""
        if isinstance(node, ast.filter.Is):
            return self._is(type_, node)
        if isinstance(node, ast.filter.Not):
            return self._not(type_, node)
        if isinstance(node, ast.filter.Has):
            return self._has(type_, node)
        if isinstance(node, (ast.filter.Any, ast.filter.All)):
            return self._branch(type_, node)
        if isinstance(node, (ast.filter.And, ast.filter.Or)):
            return self._agg(type_, node)
        if isinstance(node, (ast.filter.Equals, ast.filter.Substring, ast.filter.StartsWith, ast.filter.EndsWith)):
            return self._value(type_, node)
        if isinstance(node, (ast.filter.LessThan, ast.filter.GreaterThan)):
            return self._bounded(type_, node)
        # invalid node
        raise errors.BackendError(f'expected filter expression, found {node}')

    def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> typing.Tuple[bsc.Vertex, bsc.Vertex]:
        """Route *node* to the handler of the respective PredicateExpression subclass."""
        if isinstance(node, ast.filter.Predicate):
            return self._predicate(node)
        if isinstance(node, ast.filter.OneOf):
            return self._one_of(node)
        # invalid node
        raise errors.BackendError(f'expected predicate expression, found {node}')


    ## predicate expressions

    def _predicate(self, node: ast.filter.Predicate) -> typing.Tuple[bsc.Vertex, bsc.Vertex]:
        # predicate exists in the schema
        if not self.schema.has_predicate(node.predicate):
            raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema')
        # determine domain and range
        pred = self.schema.predicate(node.predicate)
        if not isinstance(pred.range, (bsc.Node, bsc.Literal)):
            raise errors.BackendError(f'the range of predicate {pred} is undefined')
        dom, rng = pred.domain, pred.range
        if node.reverse:
            dom, rng = rng, dom # type: ignore [assignment] # variable re-use confuses mypy
        # return domain and range
        return dom, rng

    def _one_of(self, node: ast.filter.OneOf) -> typing.Tuple[bsc.Vertex, bsc.Vertex]:
        # determine domain and range types
        # NOTE: select the most specific domain and the most generic range
        dom, rng = None, None
        for pred in node:
            # parse child expression
            subdom, subrng = self._parse_predicate_expression(pred)
            try:
                # determine overall domain
                if dom is None or subdom < dom: # pick most specific domain
                    dom = subdom
                # domains must be related across all child expressions
                if not subdom <= dom and not subdom >= dom:
                    raise errors.ConsistencyError(f'domains {subdom} and {dom} are not related')
            except TypeError as err: # compared literal vs. node
                raise errors.ConsistencyError(f'domains {subdom} and {dom} are not of the same type') from err

            try:
                # determine overall range
                if rng is None or subrng > rng: # pick most generic range
                    rng = subrng
                # ranges must be related across all child expressions
                if not subrng <= rng and not subrng >= rng:
                    raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related')
            except TypeError as err: # compared literal vs. node
                raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not of the same type') from err
        # OneOf guarantees at least one expression, dom and rng are always bsc.Vertex.
        # mypy does not realize this, hence we ignore the warning.
        return dom, rng # type: ignore [return-value]


    ## intermediates

    def _branch(self, type_: bsc.Vertex, node: ast.filter._Branch):
        # type is a Node
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        # type exists in the schema
        # FIXME: Isn't it actually guaranteed that the type (except the root type) is part of the schema?
        # all types can be traced back to (a) root_type, (b) predicate, or (c) manually set (e.g. in _is).
        # For (a), we do (and have to) perform a check. For (c), the code base should be consistent throughout
        # the module, so this is an assumption that has to be ensured in schema.Schema. For (b), we know (and
        # check) that the predicate is in the schema, hence all node/literals derived from it are also in the
        # schema by construction of the schema.Schema class. So, why do we check this every time?
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')
        # predicate is valid
        dom, rng = self._parse_predicate_expression(node.predicate)
        # type_ is a subtype of the predicate's domain
        if not type_ <= dom:
            raise errors.ConsistencyError(f'expected type {dom} or subtype thereof, found {type_}')
        # child expression is valid
        self._parse_filter_expression(rng, node.expr)

    def _agg(self, type_: bsc.Vertex, node: ast.filter._Agg):
        for expr in node:
            # child expression is valid
            self._parse_filter_expression(type_, expr)

    def _not(self, type_: bsc.Vertex, node: ast.filter.Not):
        # child expression is valid
        self._parse_filter_expression(type_, node.expr)

    def _has(self, type_: bsc.Vertex, node: ast.filter.Has):
        # type is a Node
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')
        # predicate is valid
        dom, _= self._parse_predicate_expression(node.predicate)
        # type_ is a subtype of the predicate's domain
        if not type_ <= dom:
            raise errors.ConsistencyError(f'expected type {dom}, found {type_}')
        # node.count is a numerical expression
        self._parse_filter_expression(self.schema.literal(ns.bsfs.Number), node.count)


    ## conditions

    def _is(self, type_: bsc.Vertex, node: ast.filter.Is): # pylint: disable=unused-argument # (node)
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')

    def _value(self, type_: bsc.Vertex, node: ast.filter._Value): # pylint: disable=unused-argument # (node)
        # type is a literal
        if not isinstance(type_, bsc.Literal):
            raise errors.ConsistencyError(f'expected a Literal, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.literals():
            raise errors.ConsistencyError(f'literal {type_} is not in the schema')
        # FIXME: Check if node.value corresponds to type_
        # FIXME: A specific literal might be requested (i.e., a numeric type when used in Has)

    def _bounded(self, type_: bsc.Vertex, node: ast.filter._Bounded): # pylint: disable=unused-argument # (node)
        # type is a literal
        if not isinstance(type_, bsc.Literal):
            raise errors.ConsistencyError(f'expected a Literal, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.literals():
            raise errors.ConsistencyError(f'literal {type_} is not in the schema')
        # type must be a numerical
        if not type_ <= self.schema.literal(ns.bsfs.Number):
            raise errors.ConsistencyError(f'expected a number type, found {type_}')
        # FIXME: Check if node.value corresponds to type_


## EOF ##