1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
|
"""
Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
import itertools
import time
import typing
# bsfs imports
from bsfs import schema as _schema
from bsfs.namespace import ns
from bsfs.triple_store import TripleStoreBase
from bsfs.utils import errors, URI, typename
# inner-module imports
from . import ac
# exports
__all__: typing.Sequence[str] = (
'Nodes',
)
## code ##
class Nodes():
"""
NOTE: guids may or may not exist. This is not verified as nodes are created on demand.
"""
# triple store backend.
_backend: TripleStoreBase
# user uri.
_user: URI
# node type.
_node_type: _schema.Node
# guids of nodes. Can be empty.
_guids: typing.Set[URI]
def __init__(
self,
backend: TripleStoreBase,
user: URI,
node_type: _schema.Node,
guids: typing.Iterable[URI],
):
self._backend = backend
self._user = user
self._node_type = node_type
self._guids = set(guids)
self.__ac = ac.NullAC(self._backend, self._user)
def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, Nodes) \
and self._backend == other._backend \
and self._user == other._user \
and self._node_type == other._node_type \
and self._guids == other._guids
def __hash__(self) -> int:
return hash((type(self), self._backend, self._user, self._node_type, tuple(sorted(self._guids))))
def __repr__(self) -> str:
return f'{typename(self)}({self._backend}, {self._user}, {self._node_type}, {self._guids})'
def __str__(self) -> str:
return f'{typename(self)}({self._node_type}, {self._guids})'
@property
def node_type(self) -> _schema.Node:
"""Return the node's type."""
return self._node_type
@property
def guids(self) -> typing.Iterator[URI]:
"""Return all node guids."""
return iter(self._guids)
def set(
self,
pred: URI, # FIXME: URI or _schema.Predicate?
value: typing.Any,
) -> 'Nodes':
"""
"""
try:
# insert triples
self.__set(pred, value)
# save changes
self._backend.commit()
except (
errors.PermissionDeniedError, # tried to set a protected predicate (ns.bsm.t_created)
errors.ConsistencyError, # node types are not in the schema or don't match the predicate
errors.InstanceError, # guids/values don't have the correct type
TypeError, # value is supposed to be a Nodes instance
ValueError, # multiple values passed to unique predicate
):
# revert changes
self._backend.rollback()
# notify the client
raise
return self
def set_from_iterable(
self,
predicate_values: typing.Iterable[typing.Tuple[URI, typing.Any]], # FIXME: URI or _schema.Predicate?
) -> 'Nodes':
"""
"""
# TODO: Could group predicate_values by predicate to gain some efficiency
# TODO: ignore errors on some predicates; For now this could leave residual
# data (e.g. some nodes were created, some not).
try:
# insert triples
for pred, value in predicate_values:
self.__set(pred, value)
# save changes
self._backend.commit()
except (
errors.PermissionDeniedError, # tried to set a protected predicate (ns.bsm.t_created)
errors.ConsistencyError, # node types are not in the schema or don't match the predicate
errors.InstanceError, # guids/values don't have the correct type
TypeError, # value is supposed to be a Nodes instance
ValueError, # multiple values passed to unique predicate
):
# revert changes
self._backend.rollback()
# notify the client
raise
return self
def __set(
self,
predicate: URI,
value: typing.Any,
#on_error: str = 'ignore', # ignore, rollback
):
"""
"""
# get normalized predicate. Raises KeyError if *pred* not in the schema.
pred = self._backend.schema.predicate(predicate)
# node_type must be a subclass of the predicate's domain
node_type = self.node_type
if not node_type <= pred.domain:
raise errors.ConsistencyError(f'{node_type} must be a subclass of {pred.domain}')
# check reserved predicates (access controls, metadata, internal structures)
# FIXME: Needed? Could be integrated into other AC methods (by passing the predicate!)
# This could allow more fine-grained predicate control (e.g. based on ownership)
# rather than a global approach like this.
if self.__ac.is_protected_predicate(pred):
raise errors.PermissionDeniedError(pred)
# set operation affects all nodes (if possible)
guids = set(self.guids)
# ensure subject node existence; create nodes if need be
guids = set(self._ensure_nodes(node_type, guids))
# check value
if isinstance(pred.range, _schema.Literal):
# check write permissions on existing nodes
# As long as the user has write permissions, we don't restrict
# the creation or modification of literal values.
guids = set(self.__ac.write_literal(node_type, guids))
# insert literals
# TODO: Support passing iterators as values for non-unique predicates
self._backend.set(
node_type,
guids,
pred,
[value],
)
elif isinstance(pred.range, _schema.Node):
# check value type
if not isinstance(value, Nodes):
raise TypeError(value)
# value's node_type must be a subclass of the predicate's range
if not value.node_type <= pred.range:
raise errors.ConsistencyError(f'{value.node_type} must be a subclass of {pred.range}')
# check link permissions on source nodes
# Link permissions cover adding and removing links on the source node.
# Specifically, link permissions also allow to remove links to other
# nodes if needed (e.g. for unique predicates).
guids = set(self.__ac.link_from_node(node_type, guids))
# get link targets
targets = set(value.guids)
# ensure existence of value nodes; create nodes if need be
targets = set(self._ensure_nodes(value.node_type, targets))
# check link permissions on target nodes
targets = set(self.__ac.link_to_node(value.node_type, targets))
# insert node links
self._backend.set(
node_type,
guids,
pred,
targets,
)
else:
raise errors.UnreachableError()
def _ensure_nodes(
self,
node_type: _schema.Node,
guids: typing.Iterable[URI],
):
# check node existence
guids = set(guids)
existing = set(self._backend.exists(node_type, guids))
# get nodes to be created
missing = guids - existing
# create nodes if need be
if len(missing) > 0:
# check which missing nodes can be created
missing = set(self.__ac.createable(node_type, missing))
# create nodes
self._backend.create(node_type, missing)
# add bookkeeping triples
self._backend.set(node_type, missing,
self._backend.schema.predicate(ns.bsm.t_created), [time.time()])
# add permission triples
self.__ac.create(node_type, missing)
# return available nodes
return existing | missing
## EOF ##
|