1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
"""
Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
import typing
# bsfs imports
from bsfs import schema as bsc
from bsfs.query import ast
from bsfs.utils import errors
# inner-module imports
from .utils import GenHopName, Query
# exports
__all__: typing.Sequence[str] = (
'Fetch',
)
## code ##
class Fetch():
"""Translate `bsfs.query.ast.fetch` structures into Sparql queries."""
def __init__(self, schema):
self.schema = schema
self.ngen = GenHopName(prefix='?fch')
def __call__(
self,
root_type: bsc.Node,
root: ast.fetch.FetchExpression,
) -> Query:
"""
"""
# check root_type
if not isinstance(root_type, bsc.Node):
raise errors.BackendError(f'expected Node, found {root_type}')
if root_type not in self.schema.nodes():
raise errors.ConsistencyError(f'node {root_type} is not in the schema')
# parse root
terms, expr = self._parse_fetch_expression(root_type, root, '?ent')
# assemble query
return Query(
root_type=root_type.uri,
root_head='?ent',
select=terms,
where=expr,
)
def _parse_fetch_expression(
self,
node_type: bsc.Vertex,
node: ast.fetch.FetchExpression,
head: str,
):
"""Route *node* to the handler of the respective FetchExpression subclass."""
if isinstance(node, ast.fetch.All):
return self._all(node_type, node, head)
if isinstance(node, ast.fetch.Fetch):
return self._fetch(node_type, node, head)
if isinstance(node, ast.fetch.Node):
return self._node(node_type, node, head)
if isinstance(node, ast.fetch.Value):
return self._value(node_type, node, head)
if isinstance(node, ast.fetch.This):
return self._this(node_type, node, head)
# invalid node
raise errors.BackendError(f'expected fetch expression, found {node}')
def _all(self, node_type: bsc.Vertex, node: ast.fetch.All, head: str):
# child expressions
terms, exprs = zip(*[self._parse_fetch_expression(node_type, expr, head) for expr in node])
terms = {term for sub in terms for term in sub}
exprs = ' .\n'.join({expr for expr in exprs if len(expr.strip()) > 0})
return terms, exprs
def _fetch(self, node_type: bsc.Vertex, node: ast.fetch.Fetch, head: str): # pylint: disable=unused-argument # (node_type)
# child expressions
rng = self.schema.predicate(node.predicate).range
nexthead = next(self.ngen)
terms, expr = self._parse_fetch_expression(rng, node.expr, nexthead)
return terms, f'OPTIONAL{{ {head} <{node.predicate}> {nexthead} .\n {expr} }}'
def _node(self, node_type: bsc.Vertex, node: ast.fetch.Node, head: str): # pylint: disable=unused-argument # (node_type)
if f'?{node.name}'.startswith(self.ngen.prefix):
raise errors.BackendError(f'Node name must start with {self.ngen.prefix}')
# compose and return statement
term = next(self.ngen)
return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}'
def _value(self, node_type: bsc.Vertex, node: ast.fetch.Value, head: str): # pylint: disable=unused-argument # (node_type)
if f'?{node.name}'.startswith(self.ngen.prefix):
raise errors.BackendError(f'Value name must start with {self.ngen.prefix}')
# compose and return statement
term = next(self.ngen)
return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}'
def _this(self, node_type: bsc.Vertex, node: ast.fetch.This, head: str): # pylint: disable=unused-argument # (node_type)
if f'?{node.name}'.startswith(self.ngen.prefix):
raise errors.BackendError(f'This name must start with {self.ngen.prefix}')
# compose and return statement
return {(head, node.name)}, ''
## EOF ##
|