1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
|
# standard imports
import itertools
import typing
# external imports
import rdflib
# bsfs imports
from bsfs.namespace import ns
from bsfs.utils import errors, URI, typename
# inner-module imports
from . import types
from . import schema
# exports
__all__: typing.Sequence[str] = (
'to_string',
'from_string',
)
## code ##
def from_string(schema_str: str) -> schema.Schema:
"""Load and return a Schema from a string."""
# parse string into rdf graph
graph = rdflib.Graph()
graph.parse(data=schema_str, format='turtle')
# helper functions
def _fetch_value(
subject: URI,
predicate: rdflib.URIRef,
value_factory: typing.Callable[[typing.Any], typing.Any],
) -> typing.Optional[typing.Any]:
"""Fetch the object of a given subject and predicate.
Raises a `errors.ConsistencyError` if multiple objects match.
"""
values = list(graph.objects(rdflib.URIRef(subject), predicate))
if len(values) == 0:
return None
if len(values) == 1:
return value_factory(values[0])
raise errors.ConsistencyError(
f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')
def _convert(value):
"""Convert the subject type from rdflib to a bsfs native type."""
if isinstance(value, rdflib.Literal):
return value.value
if isinstance(value, rdflib.URIRef):
return URI(value)
# value is neither a node nor a literal, but e.g. a blank node
raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}')
def _fetch_hierarchically(factory, curr):
"""Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
# emit current node
yield curr
# walk through childs
for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
# fetch annotations
annotations = {
URI(pred): _convert(value)
for pred, value # FIXME: preserve datatype of value?!
in graph.predicate_objects(child)
if URI(pred) != ns.rdfs.subClassOf
}
# convert child to URI
child = URI(child)
# check circular dependency
if child == curr.uri or child in {node.uri for node in curr.parents()}:
raise errors.ConsistencyError('circular dependency')
# recurse and emit (sub*)childs
yield from _fetch_hierarchically(factory, factory(child, curr, **annotations))
# fetch nodes
nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE))
nodes_lut = {node.uri: node for node in nodes}
if len(nodes_lut) != len(nodes):
raise errors.ConsistencyError('inconsistent nodes')
# fetch literals
def _build_literal(uri, parent, **annotations):
"""Literal factory."""
# break out on root feature type
if uri == types.ROOT_FEATURE.uri:
return types.ROOT_FEATURE
# handle feature types
if isinstance(parent, types.Feature):
# clean annotations
annotations.pop(ns.bsfs.dimension, None)
annotations.pop(ns.bsfs.dtype, None)
annotations.pop(ns.bsfs.distance, None)
# get dimension
dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
# get dtype
dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
# get distance
distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
# return feature
return parent.child(URI(uri), dtype=dtype, dimension=dimension, distance=distance, **annotations)
# handle non-feature types
return parent.child(URI(uri), **annotations)
literals = set(_fetch_hierarchically(_build_literal, types.ROOT_LITERAL))
literals_lut = {lit.uri: lit for lit in literals}
if len(literals_lut) != len(literals):
raise errors.ConsistencyError('inconsistent literals')
# fetch predicates
def _build_predicate(uri, parent, **annotations):
"""Predicate factory."""
# clean annotations
annotations.pop(ns.rdfs.domain, None)
annotations.pop(ns.rdfs.range, None)
annotations.pop(ns.bsfs.unique, None)
# get domain
dom = _fetch_value(uri, rdflib.RDFS.domain, URI)
if dom is not None and dom not in nodes_lut:
raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}')
if dom is not None:
dom = nodes_lut[dom]
# get range
rng = _fetch_value(uri, rdflib.RDFS.range, URI)
if rng is not None and rng not in nodes_lut and rng not in literals_lut:
raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}')
if rng is not None:
rng = nodes_lut.get(rng, literals_lut.get(rng))
# get unique
unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
# build predicate
return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)
return schema.Schema(predicates, nodes, literals)
def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str:
"""Serialize a `bsfs.schema.Schema` to a string.
See `rdflib.Graph.serialize` for viable formats (default: turtle).
"""
# type of emitted triples.
T_TRIPLE = typing.Iterator[typing.Tuple[rdflib.URIRef, rdflib.URIRef, rdflib.term.Identifier]]
def _type(tpe: types._Type) -> T_TRIPLE :
"""Emit _Type properties (parent, annotations)."""
# emit parent
if tpe.parent is not None:
yield (
rdflib.URIRef(tpe.uri),
rdflib.URIRef(ns.rdfs.subClassOf),
rdflib.URIRef(tpe.parent.uri),
)
# emit annotations
for prop, value in tpe.annotations.items():
yield (
rdflib.URIRef(tpe.uri),
rdflib.URIRef(prop),
rdflib.Literal(value), # FIXME: datatype?!
)
def _predicate(pred: types.Predicate) -> T_TRIPLE:
"""Emit Predicate properties (domain, range, unique)."""
# no need to emit anything for the root predicate
if pred == types.ROOT_PREDICATE:
return
# emit domain
if pred.domain != getattr(pred.parent, 'domain', None):
yield (
rdflib.URIRef(pred.uri),
rdflib.URIRef(ns.rdfs.domain),
rdflib.URIRef(pred.domain.uri),
)
# emit range
if pred.range != getattr(pred.parent, 'range', None):
yield (
rdflib.URIRef(pred.uri),
rdflib.URIRef(ns.rdfs.range),
rdflib.URIRef(pred.range.uri),
)
# emit cardinality
if pred.unique != getattr(pred.parent, 'unique', None):
yield (
rdflib.URIRef(pred.uri),
rdflib.URIRef(ns.bsfs.unique),
rdflib.Literal(pred.unique, datatype=rdflib.XSD.boolean),
)
def _feature(feat: types.Feature) -> T_TRIPLE:
"""Emit Feature properties (dimension, dtype, distance)."""
# emit size
if feat.dimension != getattr(feat.parent, 'dimension', None):
yield (
rdflib.URIRef(feat.uri),
rdflib.URIRef(ns.bsfs.dimension),
rdflib.Literal(feat.dimension, datatype=rdflib.XSD.integer),
)
# emit dtype
if feat.dtype != getattr(feat.parent, 'dtype', None):
yield (
rdflib.URIRef(feat.uri),
rdflib.URIRef(ns.bsfs.dtype),
rdflib.URIRef(feat.dtype),
)
# emit distance
if feat.distance != getattr(feat.parent, 'distance', None):
yield (
rdflib.URIRef(feat.uri),
rdflib.URIRef(ns.bsfs.distance),
rdflib.URIRef(feat.distance),
)
def _parse(node: types._Type) -> T_TRIPLE:
"""Emit all properties of a type."""
# check arg
if not isinstance(node, types._Type): # pylint: disable=protected-access
raise TypeError(node)
# emit _Type essentials
yield from _type(node)
# emit properties of derived types
if isinstance(node, types.Predicate):
yield from _predicate(node)
if isinstance(node, types.Feature):
yield from _feature(node)
# create graph
graph = rdflib.Graph()
# add triples to graph
nodes = itertools.chain(
schema_inst.nodes(),
schema_inst.literals(),
schema_inst.predicates())
for node in nodes:
for triple in _parse(node):
graph.add(triple)
# add known namespaces for readability
# FIXME: more generically?
graph.bind('bse', rdflib.URIRef(ns.bse['']))
graph.bind('bsfs', rdflib.URIRef(ns.bsfs['']))
graph.bind('bsm', rdflib.URIRef(ns.bsm['']))
graph.bind('rdf', rdflib.URIRef(ns.rdf['']))
graph.bind('rdfs', rdflib.URIRef(ns.rdfs['']))
graph.bind('schema', rdflib.URIRef(ns.schema['']))
graph.bind('xsd', rdflib.URIRef(ns.xsd['']))
# serialize to turtle
return graph.serialize(format=fmt)
## EOF ##
|