aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-01-20 18:01:17 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-01-20 18:01:17 +0100
commite2f08efc0d8a3c875994bdb69623c30cce5079d9 (patch)
tree0870ac597a55431e63a20e05bb11cf913a5f3e3d /bsfs
parenta4789394e40aaa3152ad6009955709a6c7d277c2 (diff)
downloadbsfs-e2f08efc0d8a3c875994bdb69623c30cce5079d9.tar.gz
bsfs-e2f08efc0d8a3c875994bdb69623c30cce5079d9.tar.bz2
bsfs-e2f08efc0d8a3c875994bdb69623c30cce5079d9.zip
fetch AST validation
Diffstat (limited to 'bsfs')
-rw-r--r--bsfs/query/validator.py123
1 files changed, 122 insertions, 1 deletions
diff --git a/bsfs/query/validator.py b/bsfs/query/validator.py
index 904ac14..9fbff12 100644
--- a/bsfs/query/validator.py
+++ b/bsfs/query/validator.py
@@ -49,7 +49,7 @@ class Filter():
"""
# root_type must be a schema.Node
if not isinstance(root_type, bsc.Node):
- raise TypeError(f'Expected a node, found {typename(root_type)}')
+ raise TypeError(f'expected a node, found {typename(root_type)}')
# root_type must exist in the schema
if root_type not in self.schema.nodes():
raise errors.ConsistencyError(f'{root_type} is not defined in the schema')
@@ -223,4 +223,125 @@ class Filter():
# FIXME: Check if node.value corresponds to type_
+class Fetch():
+ """Validate a `bsfs.query.ast.fetch` query's structure and schema compliance.
+
+ * Value can only be applied on literals
+ * Node can only be applied on nodes
+ * Names must be non-empty
+ * Branching nodes' predicates must match the type
+ * Symbols must be in the schema
+ * Predicates must follow the schema
+
+ """
+
+ # schema to validate against.
+ schema: bsc.Schema
+
+ def __init__(self, schema: bsc.Schema):
+ self.schema = schema
+
+ def __call__(self, root_type: bsc.Node, query: ast.fetch.FetchExpression):
+ """Validate a fetch *query*, assuming the subject having *root_type*.
+
+ Raises a `bsfs.utils.errors.ConsistencyError` if the query violates the schema.
+ Raises a `bsfs.utils.errors.BackendError` if the query structure is invalid.
+
+ """
+ # root_type must be a schema.Node
+ if not isinstance(root_type, bsc.Node):
+ raise TypeError(f'expected a node, found {typename(root_type)}')
+ # root_type must exist in the schema
+ if root_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{root_type} is not defined in the schema')
+ # query must be a FetchExpression
+ if not isinstance(query, ast.fetch.FetchExpression):
+ raise TypeError(f'expected a fetch expression, found {typename(query)}')
+ # check root expression
+ self._parse_fetch_expression(root_type, query)
+ # all tests passed
+ return True
+
+ def _parse_fetch_expression(self, type_: bsc.Vertex, node: ast.fetch.FetchExpression):
+ """Route *node* to the handler of the respective FetchExpression subclass."""
+ if isinstance(node, (ast.fetch.Fetch, ast.fetch.Value, ast.fetch.Node)):
+ # NOTE: don't return so that checks below are executed
+ self._branch(type_, node)
+ if isinstance(node, (ast.fetch.Value, ast.fetch.Node)):
+ # NOTE: don't return so that checks below are executed
+ self._named(type_, node)
+ if isinstance(node, ast.fetch.All):
+ return self._all(type_, node)
+ if isinstance(node, ast.fetch.Fetch):
+ return self._fetch(type_, node)
+ if isinstance(node, ast.fetch.Value):
+ return self._value(type_, node)
+ if isinstance(node, ast.fetch.Node):
+ return self._node(type_, node)
+ if isinstance(node, ast.fetch.This):
+ return self._this(type_, node)
+ # invalid node
+ raise errors.BackendError(f'expected fetch expression, found {node}')
+
+ def _all(self, type_: bsc.Vertex, node: ast.fetch.All):
+ # check child expressions
+ for expr in node:
+ self._parse_fetch_expression(type_, expr)
+
+ def _branch(self, type_: bsc.Vertex, node: ast.fetch._Branch):
+ # type is a node
+ if not isinstance(type_, bsc.Node):
+ raise errors.ConsistencyError(f'expected a Node, found {type_}')
+ # node exists in the schema
+ if type_ not in self.schema.nodes():
+ raise errors.ConsistencyError(f'node {type_} is not in the schema')
+ # predicate exists in the schema
+ if not self.schema.has_predicate(node.predicate):
+ raise errors.ConsistencyError(f'predicate {node.predicate} is not in the schema')
+ pred = self.schema.predicate(node.predicate)
+ # type_ must be a subclass of domain
+ if not type_ <= pred.domain:
+ raise errors.ConsistencyError(
+ f'expected type {pred.domain} or subtype thereof, found {type_}')
+
+ def _fetch(self, type_: bsc.Vertex, node: ast.fetch.Fetch): # pylint: disable=unused-argument # type_ was considered in _branch
+ # range must be a node
+ rng = self.schema.predicate(node.predicate).range
+ if not isinstance(rng, bsc.Node):
+ raise errors.ConsistencyError(
+ f'expected the predicate\'s range to be a Node, found {rng}')
+ # child expression must be valid
+ self._parse_fetch_expression(rng, node.expr)
+
+ def _named(self, type_: bsc.Vertex, node: ast.fetch._Named): # pylint: disable=unused-argument # type_ was considered in _branch
+ # name must be set
+ if node.name.strip() == '':
+ raise errors.BackendError('node name cannot be empty')
+ # FIXME: check for double name use?
+
+ def _node(self, type_: bsc.Vertex, node: ast.fetch.Node): # pylint: disable=unused-argument # type_ was considered in _branch
+ # range must be a node
+ rng = self.schema.predicate(node.predicate).range
+ if not isinstance(rng, bsc.Node):
+ raise errors.ConsistencyError(
+ f'expected the predicate\'s range to be a Node, found {rng}')
+
+ def _value(self, type_: bsc.Vertex, node: ast.fetch.Value): # pylint: disable=unused-argument # type_ was considered in _branch
+ # range must be a literal
+ rng = self.schema.predicate(node.predicate).range
+ if not isinstance(rng, bsc.Literal):
+ raise errors.ConsistencyError(
+ f'expected the predicate\'s range to be a Literal, found {rng}')
+
+ def _this(self, type_: bsc.Vertex, node: ast.fetch.This):
+ # type is a node
+ if not isinstance(type_, bsc.Node):
+ raise errors.ConsistencyError(f'expected a Node, found {type_}')
+ # node exists in the schema
+ if type_ not in self.schema.nodes():
+ raise errors.ConsistencyError(f'node {type_} is not in the schema')
+ # name must be set
+ if node.name.strip() == '':
+ raise errors.BackendError('node name cannot be empty')
+
## EOF ##