1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# imports
import os
import typing
# bsfs imports
from bsfs.query import ast, validate
from bsfs import schema as bsc
from bsfs.triple_store import TripleStoreBase
from bsfs.utils import URI, typename
# inner-module imports
from . import ac
from . import nodes as _nodes
from . import resolve
# exports
__all__: typing.Sequence[str] = (
'Graph',
)
## code ##
class Graph():
"""The Graph class provides a convenient interface to query and access a graph.
Since it logically builds on the concept of graphs it is easier to
navigate than raw triple stores. Naturally, it uses a triple store
as *backend*. It also controls actions via access permissions to a *user*.
"""
# link to the triple storage backend.
_backend: TripleStoreBase
# access controls.
_ac: ac.AccessControlBase
# query resolver.
_resolver: resolve.Filter
# query validator.
_validate: validate.Filter
def __init__(
self,
backend: TripleStoreBase,
access_control: ac.AccessControlBase,
):
# store members
self._backend = backend
self._ac = access_control
# helper classes
self._resolver = resolve.Filter(self._backend.schema)
self._validate = validate.Filter(self._backend.schema)
# ensure Graph schema requirements
self.migrate(self._backend.schema)
def __hash__(self) -> int:
return hash((type(self), self._backend, self._ac))
def __eq__(self, other) -> bool:
return isinstance(other, type(self)) \
and self._backend == other._backend \
and self._ac == other._ac
def __repr__(self) -> str:
return f'{typename(self)}({repr(self._backend)}, {self._ac})'
def __str__(self) -> str:
return f'{typename(self)}({str(self._backend)})'
@property
def schema(self) -> bsc.Schema:
"""Return the store's local schema."""
return self._backend.schema
def migrate(self, schema: bsc.Schema, append: bool = True) -> 'Graph':
"""Migrate the current schema to a new *schema*.
Appends to the current schema by default; control this via *append*.
The `Graph` may add additional classes to the schema that are required for its interals.
"""
# check args
if not isinstance(schema, bsc.Schema):
raise TypeError(schema)
# append to current schema
if append:
schema = schema + self._backend.schema
# add Graph schema requirements
with open(os.path.join(os.path.dirname(__file__), 'schema.nt'), mode='rt', encoding='UTF-8') as ifile:
schema = schema + bsc.from_string(ifile.read())
# migrate schema in backend
# FIXME: consult access controls!
self._backend.schema = schema
# re-initialize members
self._resolver.schema = self.schema
self._validate.schema = self.schema
# return self
return self
def nodes(self, node_type: URI, guids: typing.Iterable[URI]) -> _nodes.Nodes:
"""Return nodes *guids* of type *node_type* as a `bsfs.graph.Nodes` instance.
Note that the *guids* need not to exist (however, the *node_type* has
to be part of the schema). Inexistent guids will be created (using
*node_type*) once some data is assigned to them.
"""
# get node type
type_ = self.schema.node(node_type)
# NOTE: Nodes constructor materializes guids.
return _nodes.Nodes(self._backend, self._ac, type_, guids)
def node(self, node_type: URI, guid: URI) -> _nodes.Nodes:
"""Return node *guid* of type *node_type* as a `bsfs.graph.Nodes` instance.
Note that the *guid* need not to exist (however, the *node_type* has
to be part of the schema). An inexistent guid will be created (using
*node_type*) once some data is assigned to them.
"""
return self.nodes(node_type, {guid})
def empty(self, node_type: URI) -> _nodes.Nodes:
"""Return a `Nodes` instance with type *node_type* but no nodes."""
return self.nodes(node_type, set())
def get(
self,
node_type: URI,
query: typing.Optional[ast.filter.FilterExpression],
) -> _nodes.Nodes:
"""Return a `Nodes` instance over all nodes of type *node_type* that match the *query*."""
# return Nodes instance
type_ = self.schema.node(node_type)
return _nodes.Nodes(self._backend, self._ac, type_, self.__get(node_type, query))
def sorted(
self,
node_type: URI,
query: typing.Optional[ast.filter.FilterExpression],
# FIXME: sort ast
) -> typing.Iterator[_nodes.Nodes]:
"""Return a iterator over `Nodes` instances over all nodes of type *node_type* that match the *query*."""
# FIXME: Order should be a parameter
# return iterator over Nodes instances
type_ = self.schema.node(node_type)
for guid in self.__get(node_type, query):
yield _nodes.Nodes(self._backend, self._ac, type_, {guid})
def all(self, node_type: URI) -> _nodes.Nodes:
"""Return all instances of type *node_type*."""
type_ = self.schema.node(node_type)
return _nodes.Nodes(self._backend, self._ac, type_, self.__get(node_type, None))
def __get(
self,
node_type: URI,
query: typing.Optional[ast.filter.FilterExpression],
) -> typing.Iterator[URI]:
"""Build and execute a get query."""
# get node type
type_ = self.schema.node(node_type)
# resolve Nodes instances
query = self._resolver(type_, query)
# add access controls to query
query = self._ac.filter_read(type_, query)
# validate query
if query is not None:
self._validate(type_, query)
# query the backend and return the (non-materialized) result
return self._backend.get(type_, query)
## EOF ##
|