aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/triple_store/sparql/utils.py
blob: 51de893a0a40beb0490ff71fc2f0d0b434a35768 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""

Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
import typing

# external imports
import rdflib

# bsfs imports
from bsfs.namespace import ns
from bsfs.utils import typename

# exports
__all__: typing.Sequence[str] = (
    'GenHopName',
    'Query',
    )


## code ##

class GenHopName():
    """Generator that produces a new unique symbol name with each iteration."""

    # Symbol name prefix.
    prefix: str

    # Current counter.
    curr: int

    def __init__(self, prefix: str = '?hop', start: int = 0):
        self.prefix = prefix
        self.curr = start - 1

    def __next__(self):
        """Generate and return the next unique name."""
        self.curr += 1
        return self.prefix + str(self.curr)


class Query():
    """Hold, manage, and complete partial Sparql queries."""

    # root node type URI.
    root_type: str

    # root node variable name.
    root_head: str

    # (head, name) tuples (w/o root)
    select: typing.Tuple[typing.Tuple[str, str], ...]

    # where statements.
    where: str

    def __init__(
            self,
            root_type: str,
            root_head: str = '?ent',
            select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None,
            where: typing.Optional[str] = None,
            ):
        # check arguments
        if select is None:
            select = []
        if where is None:
            where = ''
        # set members
        self.root_type = root_type
        self.root_head = root_head
        self.select = tuple(select) # tuple ensures presistent order
        self.where = where.strip()

    def __str__(self) -> str:
        return self.query

    def __repr__(self) -> str:
        return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})'

    def __eq__(self, other: typing.Any) -> bool:
        return isinstance(other, type(self)) \
           and self.root_type == other.root_type \
           and self.root_head == other.root_head \
           and self.select == other.select \
           and self.where == other.where

    def __hash__(self) -> int:
        return hash((type(self), self.root_type, self.root_head, self.select, self.where))

    def __add__(self, other: typing.Any) -> 'Query':
        # check other's type
        if not isinstance(other, type(self)):
            return NotImplemented
        # check query compatibility
        if not self.root_type == other.root_type:
            raise ValueError(other)
        if not self.root_head == other.root_head:
            raise ValueError(other)
        # combine selections
        select = self.select + other.select
        # combine conditions
        conds = []
        if self.where != '':
            conds.append(self.where)
        if other.where != '':
            conds.append(other.where)
        where = ' . '.join(conds)
        # return new query
        return Query(
            root_type=self.root_type,
            root_head=self.root_head,
            select=select,
            where=where,
            )

    @property
    def names(self) -> typing.Tuple[str, ...]:
        """Return a tuple of selected variable names, excluding the root."""
        return tuple(name for _, name in self.select)

    @property
    def query(self) -> str:
        """Return an executable sparql query."""
        select = ' '.join(f'({head} as ?{name})' for head, name in self.select)
        return f'''
            SELECT DISTINCT {self.root_head} {select}
            WHERE {{
                {self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> .
                {self.where}
            }}
            ORDER BY str({self.root_head})
            '''

    def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result:
        """Execute the query on a *graph* and return the query result."""
        return graph.query(self.query)

## EOF ##