1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
"""
Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
import typing
# external imports
import rdflib
# bsfs imports
from bsfs.namespace import ns
from bsfs.utils import typename
# exports
__all__: typing.Sequence[str] = (
'GenHopName',
'Query',
)
## code ##
class GenHopName():
"""Generator that produces a new unique symbol name with each iteration."""
# Symbol name prefix.
prefix: str
# Current counter.
curr: int
def __init__(self, prefix: str = '?hop', start: int = 0):
self.prefix = prefix
self.curr = start - 1
def __next__(self):
"""Generate and return the next unique name."""
self.curr += 1
return self.prefix + str(self.curr)
class Query():
"""Hold, manage, and complete partial Sparql queries."""
# root node type URI.
root_type: str
# root node variable name.
root_head: str
# (head, name) tuples (w/o root)
select: typing.Tuple[typing.Tuple[str, str], ...]
# where statements.
where: str
def __init__(
self,
root_type: str,
root_head: str = '?ent',
select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None,
where: typing.Optional[str] = None,
):
# check arguments
if select is None:
select = []
if where is None:
where = ''
# set members
self.root_type = root_type
self.root_head = root_head
self.select = tuple(select) # tuple ensures presistent order
self.where = where.strip()
def __str__(self) -> str:
return self.query
def __repr__(self) -> str:
return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})'
def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, type(self)) \
and self.root_type == other.root_type \
and self.root_head == other.root_head \
and self.select == other.select \
and self.where == other.where
def __hash__(self) -> int:
return hash((type(self), self.root_type, self.root_head, self.select, self.where))
def __add__(self, other: typing.Any) -> 'Query':
# check other's type
if not isinstance(other, type(self)):
return NotImplemented
# check query compatibility
if not self.root_type == other.root_type:
raise ValueError(other)
if not self.root_head == other.root_head:
raise ValueError(other)
# combine selections
select = self.select + other.select
# combine conditions
conds = []
if self.where != '':
conds.append(self.where)
if other.where != '':
conds.append(other.where)
where = ' . '.join(conds)
# return new query
return Query(
root_type=self.root_type,
root_head=self.root_head,
select=select,
where=where,
)
@property
def names(self) -> typing.Tuple[str, ...]:
"""Return a tuple of selected variable names, excluding the root."""
return tuple(name for _, name in self.select)
@property
def query(self) -> str:
"""Return an executable sparql query."""
select = ' '.join(f'({head} as ?{name})' for head, name in self.select)
return f'''
SELECT DISTINCT {self.root_head} {select}
WHERE {{
{self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> .
{self.where}
}}
ORDER BY str({self.root_head})
'''
def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result:
"""Execute the query on a *graph* and return the query result."""
return graph.query(self.query)
## EOF ##
|