1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
"""
Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
from collections import abc
import itertools
import typing
# external imports
import rdflib
# bsfs imports
from bsfs.namespace import ns
from bsfs.utils import errors, URI, typename
# inner-module imports
from . import types
from . import schema
# exports
__all__: typing.Sequence[str] = (
'to_string',
'from_string',
)
## code ##
def from_string(schema_str: str) -> schema.Schema:
"""Load and return a Schema from a string."""
# parse string into rdf graph
graph = rdflib.Graph()
graph.parse(data=schema_str, format='turtle')
# helper functions
def _convert(value):
"""Convert the subject type from rdflib to a bsfs native type."""
if isinstance(value, rdflib.Literal):
return value.value
if isinstance(value, rdflib.URIRef):
return URI(value)
raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}')
def _fetch_hierarchically(factory, curr):
"""Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
# emit current node
yield curr
# walk through childs
for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
# fetch annotations
annotations = {
URI(pred): _convert(value)
for pred, value # FIXME: preserve datatype of value?!
in graph.predicate_objects(child)
if URI(pred) != ns.rdfs.subClassOf
}
# convert child to URI
child = URI(child)
# check circular dependency
if child == curr.uri or child in {node.uri for node in curr.parents()}:
raise errors.ConsistencyError('circular dependency')
# recurse and emit (sub*)childs
yield from _fetch_hierarchically(factory, factory(child, curr, **annotations))
# fetch nodes
nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE))
nodes_lut = {node.uri: node for node in nodes}
if len(nodes_lut) != len(nodes):
raise errors.ConsistencyError('inconsistent nodes')
# fetch literals
literals = set(_fetch_hierarchically(types.Literal, types.ROOT_LITERAL))
literals_lut = {lit.uri: lit for lit in literals}
if len(literals_lut) != len(literals):
raise errors.ConsistencyError('inconsistent literals')
# fetch predicates
# FIXME: type annotation
def _fetch_value(subject: URI, predicate: rdflib.URIRef, value_factory) -> typing.Optional[typing.Any]:
"""Fetch the object of a given subject and predicate. Raises a `errors.ConsistencyError` if multiple objects match."""
values = list(graph.objects(rdflib.URIRef(subject), predicate))
if len(values) == 0:
return None
elif len(values) == 1:
return value_factory(values[0])
else:
raise errors.ConsistencyError(f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')
def _build_predicate(uri, parent, **annotations):
"""Predicate factory."""
# break out on root feature type
if uri == types.ROOT_FEATURE.uri:
return types.ROOT_FEATURE
# clean annotations
annotations.pop(ns.rdfs.domain, None)
annotations.pop(ns.rdfs.range, None)
annotations.pop(ns.bsfs.unique, None)
# get domain
dom = _fetch_value(uri, rdflib.RDFS.domain, URI)
if dom is not None and dom not in nodes_lut:
raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}')
elif dom is not None:
dom = nodes_lut[dom]
# get range
rng = _fetch_value(uri, rdflib.RDFS.range, URI)
if rng is not None and rng not in nodes_lut and rng not in literals_lut:
raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}')
elif rng is not None:
rng = nodes_lut.get(rng, literals_lut.get(rng))
# get unique
unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
# handle feature types
if isinstance(parent, types.Feature):
# clean annotations
annotations.pop(ns.bsfs.dimension, None)
annotations.pop(ns.bsfs.dtype, None)
annotations.pop(ns.bsfs.distance, None)
# get dimension
dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
# get dtype
dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
# get distance
distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
# return feature
return parent.child(URI(uri), domain=dom, range=rng, unique=unique,
dtype=dtype, dimension=dimension, distance=distance, **annotations)
# handle non-feature predicate
return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)
return schema.Schema(predicates, nodes, literals)
def to_string(schema_inst: schema.Schema) -> str:
"""
"""
raise NotImplementedError()
## EOF ##
|