aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/query/validator.py
blob: 10ca492ba904cde9b99e99b69b3008b577137574 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351

# imports
import typing

# bsfs imports
from bsfs import schema as bsc
from bsfs.namespace import ns
from bsfs.utils import errors, typename

# inner-module imports
from . import ast

# exports
__all__ : typing.Sequence[str] = (
    'Filter',
    )

# FIXME: Split into a submodule and the two classes into their own respective files.

## code ##

class Filter():
    """Validate a `bsfs.query.ast.filter` query's structure and schema compliance.

    * Conditions (Bounded, Value) can only be applied on literals
    * Branches, Id, and Has can only be applied on nodes
    * Predicates' domain and range must match
    * Predicate paths must follow the schema
    * Referenced types are present in the schema

    """

    # schema to validate against.
    schema: bsc.Schema

    def __init__(self, schema: bsc.Schema):
        self.schema = schema

    def __call__(self, root_type: bsc.Node, query: ast.filter.FilterExpression) -> bool:
        """Alias for `Filter.validate`."""
        return self.validate(root_type, query)

    def validate(self, root_type: bsc.Node, query: ast.filter.FilterExpression) -> bool:
        """Validate a filter *query*, assuming the subject having *root_type*.

        Raises a `bsfs.utils.errors.ConsistencyError` if the query violates the schema.
        Raises a `bsfs.utils.errors.BackendError` if the query structure is invalid.

        """
        # root_type must be a schema.Node
        if not isinstance(root_type, bsc.Node):
            raise TypeError(f'expected a node, found {typename(root_type)}')
        # root_type must exist in the schema
        if root_type not in self.schema.nodes():
            raise errors.ConsistencyError(f'{root_type} is not defined in the schema')
        # check root expression
        self._parse_filter_expression(root_type, query)
        # all tests passed
        return True


    ## routing methods

    def _parse_filter_expression(self, type_: bsc.Vertex, node: ast.filter.FilterExpression):
        """Route *node* to the handler of the respective FilterExpression subclass."""
        if isinstance(node, ast.filter.Is):
            return self._is(type_, node)
        if isinstance(node, ast.filter.Not):
            return self._not(type_, node)
        if isinstance(node, ast.filter.Has):
            return self._has(type_, node)
        if isinstance(node, ast.filter.Distance):
            return self._distance(type_, node)
        if isinstance(node, (ast.filter.Any, ast.filter.All)):
            return self._branch(type_, node)
        if isinstance(node, (ast.filter.And, ast.filter.Or)):
            return self._agg(type_, node)
        if isinstance(node, (ast.filter.Equals, ast.filter.Substring, ast.filter.StartsWith, ast.filter.EndsWith)):
            return self._value(type_, node)
        if isinstance(node, (ast.filter.LessThan, ast.filter.GreaterThan)):
            return self._bounded(type_, node)
        # invalid node
        raise errors.BackendError(f'expected filter expression, found {node}')

    def _parse_predicate_expression(self, node: ast.filter.PredicateExpression) -> typing.Tuple[bsc.Vertex, bsc.Vertex]:
        """Route *node* to the handler of the respective PredicateExpression subclass."""
        if isinstance(node, ast.filter.Predicate):
            return self._predicate(node)
        if isinstance(node, ast.filter.OneOf):
            return self._one_of(node)
        # invalid node
        raise errors.BackendError(f'expected predicate expression, found {node}')


    ## predicate expressions

    def _predicate(self, node: ast.filter.Predicate) -> typing.Tuple[bsc.Vertex, bsc.Vertex]:
        # predicate exists in the schema
        if not self.schema.has_predicate(node.predicate):
            raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema')
        # determine domain and range
        pred = self.schema.predicate(node.predicate)
        if not isinstance(pred.range, (bsc.Node, bsc.Literal)):
            raise errors.BackendError(f'the range of predicate {pred} is undefined')
        dom, rng = pred.domain, pred.range
        if node.reverse:
            dom, rng = rng, dom # type: ignore [assignment] # variable re-use confuses mypy
        # return domain and range
        return dom, rng

    def _one_of(self, node: ast.filter.OneOf) -> typing.Tuple[bsc.Vertex, bsc.Vertex]:
        # determine domain and range types
        # NOTE: select the most specific domain and the most generic range
        dom, rng = None, None
        for pred in node:
            # parse child expression
            subdom, subrng = self._parse_predicate_expression(pred)
            # determine overall domain
            if dom is None or subdom < dom: # pick most specific domain
                dom = subdom
            # domains must be related across all child expressions
            if not subdom <= dom and not subdom >= dom:
                raise errors.ConsistencyError(f'domains {subdom} and {dom} are not related')
            # determine overall range
            if rng is None or subrng > rng: # pick most generic range
                rng = subrng
            # ranges must be related across all child expressions
            if not subrng <= rng and not subrng >= rng:
                raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related')
        # OneOf guarantees at least one expression, dom and rng are always bsc.Vertex.
        # mypy does not realize this, hence we ignore the warning.
        return dom, rng # type: ignore [return-value]


    ## intermediates

    def _branch(self, type_: bsc.Vertex, node: ast.filter._Branch):
        # type is a Node
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        # type exists in the schema
        # FIXME: Isn't it actually guaranteed that the type (except the root type) is part of the schema?
        # all types can be traced back to (a) root_type, (b) predicate, or (c) manually set (e.g. in _is).
        # For (a), we do (and have to) perform a check. For (c), the code base should be consistent throughout
        # the module, so this is an assumption that has to be ensured in schema.Schema. For (b), we know (and
        # check) that the predicate is in the schema, hence all node/literals derived from it are also in the
        # schema by construction of the schema.Schema class. So, why do we check this every time?
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')
        # predicate is valid
        dom, rng = self._parse_predicate_expression(node.predicate)
        # type_ is a subtype of the predicate's domain
        if not type_ <= dom:
            raise errors.ConsistencyError(f'expected type {dom} or subtype thereof, found {type_}')
        # child expression is valid
        self._parse_filter_expression(rng, node.expr)

    def _agg(self, type_: bsc.Vertex, node: ast.filter._Agg):
        for expr in node:
            # child expression is valid
            self._parse_filter_expression(type_, expr)

    def _not(self, type_: bsc.Vertex, node: ast.filter.Not):
        # child expression is valid
        self._parse_filter_expression(type_, node.expr)

    def _has(self, type_: bsc.Vertex, node: ast.filter.Has):
        # type is a Node
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')
        # predicate is valid
        dom, _= self._parse_predicate_expression(node.predicate)
        # type_ is a subtype of the predicate's domain
        if not type_ <= dom:
            raise errors.ConsistencyError(f'expected type {dom}, found {type_}')
        # node.count is a numerical expression
        self._parse_filter_expression(self.schema.literal(ns.bsl.Number), node.count)

    def _distance(self, type_: bsc.Vertex, node: ast.filter.Distance):
        # type is a Literal
        if not isinstance(type_, bsc.Feature):
            raise errors.ConsistencyError(f'expected a Feature, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.literals():
            raise errors.ConsistencyError(f'literal {type_} is not in the schema')
        # reference matches type_
        if len(node.reference) != type_.dimension:
            raise errors.ConsistencyError(f'reference has dimension {len(node.reference)}, expected {type_.dimension}')
        # FIXME: test dtype


    ## conditions

    def _is(self, type_: bsc.Vertex, node: ast.filter.Is): # pylint: disable=unused-argument # (node)
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')

    def _value(self, type_: bsc.Vertex, node: ast.filter._Value): # pylint: disable=unused-argument # (node)
        # type is a literal
        if not isinstance(type_, bsc.Literal):
            raise errors.ConsistencyError(f'expected a Literal, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.literals():
            raise errors.ConsistencyError(f'literal {type_} is not in the schema')
        # FIXME: Check if node.value corresponds to type_
        # FIXME: A specific literal might be requested (i.e., a numeric type when used in Has)

    def _bounded(self, type_: bsc.Vertex, node: ast.filter._Bounded): # pylint: disable=unused-argument # (node)
        # type is a literal
        if not isinstance(type_, bsc.Literal):
            raise errors.ConsistencyError(f'expected a Literal, found {type_}')
        # type exists in the schema
        if type_ not in self.schema.literals():
            raise errors.ConsistencyError(f'literal {type_} is not in the schema')
        # type must be a numerical
        if not type_ <= self.schema.literal(ns.bsl.Number):
            raise errors.ConsistencyError(f'expected a number type, found {type_}')
        # FIXME: Check if node.value corresponds to type_


class Fetch():
    """Validate a `bsfs.query.ast.fetch` query's structure and schema compliance.

    * Value can only be applied on literals
    * Node can only be applied on nodes
    * Names must be non-empty
    * Branching nodes' predicates must match the type
    * Symbols must be in the schema
    * Predicates must follow the schema

    """

    # schema to validate against.
    schema: bsc.Schema

    def __init__(self, schema: bsc.Schema):
        self.schema = schema

    def __call__(self, root_type: bsc.Node, query: ast.fetch.FetchExpression) -> bool:
        """Alias for `Fetch.validate`."""
        return self.validate(root_type, query)

    def validate(self, root_type: bsc.Node, query: ast.fetch.FetchExpression) -> bool:
        """Validate a fetch *query*, assuming the subject having *root_type*.

        Raises a `bsfs.utils.errors.ConsistencyError` if the query violates the schema.
        Raises a `bsfs.utils.errors.BackendError` if the query structure is invalid.

        """
        # root_type must be a schema.Node
        if not isinstance(root_type, bsc.Node):
            raise TypeError(f'expected a node, found {typename(root_type)}')
        # root_type must exist in the schema
        if root_type not in self.schema.nodes():
            raise errors.ConsistencyError(f'{root_type} is not defined in the schema')
        # query must be a FetchExpression
        if not isinstance(query, ast.fetch.FetchExpression):
            raise TypeError(f'expected a fetch expression, found {typename(query)}')
        # check root expression
        self._parse_fetch_expression(root_type, query)
        # all tests passed
        return True

    def _parse_fetch_expression(self, type_: bsc.Vertex, node: ast.fetch.FetchExpression):
        """Route *node* to the handler of the respective FetchExpression subclass."""
        if isinstance(node, (ast.fetch.Fetch, ast.fetch.Value, ast.fetch.Node)):
            # NOTE: don't return so that checks below are executed
            self._branch(type_, node)
        if isinstance(node, (ast.fetch.Value, ast.fetch.Node)):
            # NOTE: don't return so that checks below are executed
            self._named(type_, node)
        if isinstance(node, ast.fetch.All):
            return self._all(type_, node)
        if isinstance(node, ast.fetch.Fetch):
            return self._fetch(type_, node)
        if isinstance(node, ast.fetch.Value):
            return self._value(type_, node)
        if isinstance(node, ast.fetch.Node):
            return self._node(type_, node)
        if isinstance(node, ast.fetch.This):
            return self._this(type_, node)
        # invalid node
        raise errors.BackendError(f'expected fetch expression, found {node}')

    def _all(self, type_: bsc.Vertex, node: ast.fetch.All):
        # check child expressions
        for expr in node:
            self._parse_fetch_expression(type_, expr)

    def _branch(self, type_: bsc.Vertex, node: ast.fetch._Branch):
        # type is a node
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        # node exists in the schema
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')
        # predicate exists in the schema
        if not self.schema.has_predicate(node.predicate):
            raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema')
        pred = self.schema.predicate(node.predicate)
        # type_ must be a subclass of domain
        if not type_ <= pred.domain:
            raise errors.ConsistencyError(
                f'expected type {pred.domain} or subtype thereof, found {type_}')

    def _fetch(self, type_: bsc.Vertex, node: ast.fetch.Fetch): # pylint: disable=unused-argument # type_ was considered in _branch
        # range must be a node
        rng = self.schema.predicate(node.predicate).range
        if not isinstance(rng, bsc.Node):
            raise errors.ConsistencyError(
                f'expected the predicate\'s range to be a Node, found {rng}')
        # child expression must be valid
        self._parse_fetch_expression(rng, node.expr)

    def _named(self, type_: bsc.Vertex, node: ast.fetch._Named): # pylint: disable=unused-argument # type_ was considered in _branch
        # name must be set
        if node.name.strip() == '':
            raise errors.BackendError('node name cannot be empty')
        # FIXME: check for double name use?

    def _node(self, type_: bsc.Vertex, node: ast.fetch.Node): # pylint: disable=unused-argument # type_ was considered in _branch
        # range must be a node
        rng = self.schema.predicate(node.predicate).range
        if not isinstance(rng, bsc.Node):
            raise errors.ConsistencyError(
                f'expected the predicate\'s range to be a Node, found {rng}')

    def _value(self, type_: bsc.Vertex, node: ast.fetch.Value): # pylint: disable=unused-argument # type_ was considered in _branch
        # range must be a literal
        rng = self.schema.predicate(node.predicate).range
        if not isinstance(rng, bsc.Literal):
            raise errors.ConsistencyError(
                f'expected the predicate\'s range to be a Literal, found {rng}')

    def _this(self, type_: bsc.Vertex, node: ast.fetch.This):
        # type is a node
        if not isinstance(type_, bsc.Node):
            raise errors.ConsistencyError(f'expected a Node, found {type_}')
        # node exists in the schema
        if type_ not in self.schema.nodes():
            raise errors.ConsistencyError(f'node {type_} is not in the schema')
        # name must be set
        if node.name.strip() == '':
            raise errors.BackendError('node name cannot be empty')

## EOF ##