aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/schema/serialize.py
blob: a566d65ca3e1a18d5dea51aebbceb9028cd28447 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""

Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
import itertools
import typing

# external imports
import rdflib

# bsfs imports
from bsfs.namespace import ns
from bsfs.utils import errors, URI, typename

# inner-module imports
from . import types
from . import schema

# exports
__all__: typing.Sequence[str] = (
    'to_string',
    'from_string',
    )


## code ##

def from_string(schema_str: str) -> schema.Schema:
    """Load and return a Schema from a string."""
    # parse string into rdf graph
    graph = rdflib.Graph()
    graph.parse(data=schema_str, format='turtle')

    # helper functions
    def _convert(value):
        """Convert the subject type from rdflib to a bsfs native type."""
        if isinstance(value, rdflib.Literal):
            return value.value
        if isinstance(value, rdflib.URIRef):
            return URI(value)
        raise errors.UnreachableError(f'expected Literal or URIRef, found {typename(value)}')

    def _fetch_hierarchically(factory, curr):
        """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
        # emit current node
        yield curr
        # walk through childs
        for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
            # fetch annotations
            annotations = {
                URI(pred): _convert(value)
                for pred, value # FIXME: preserve datatype of value?!
                in graph.predicate_objects(child)
                if URI(pred) != ns.rdfs.subClassOf
                }
            # convert child to URI
            child = URI(child)
            # check circular dependency
            if child == curr.uri or child in {node.uri for node in curr.parents()}:
                raise errors.ConsistencyError('circular dependency')
            # recurse and emit (sub*)childs
            yield from _fetch_hierarchically(factory, factory(child, curr, **annotations))

    # fetch nodes
    nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE))
    nodes_lut = {node.uri: node for node in nodes}
    if len(nodes_lut) != len(nodes):
        raise errors.ConsistencyError('inconsistent nodes')

    # fetch literals
    literals = set(_fetch_hierarchically(types.Literal, types.ROOT_LITERAL))
    literals_lut = {lit.uri: lit for lit in literals}
    if len(literals_lut) != len(literals):
        raise errors.ConsistencyError('inconsistent literals')

    # fetch predicates
    # FIXME: type annotation
    def _fetch_value(subject: URI, predicate: rdflib.URIRef, value_factory) -> typing.Optional[typing.Any]:
        """Fetch the object of a given subject and predicate.
        Raises a `errors.ConsistencyError` if multiple objects match.
        """
        values = list(graph.objects(rdflib.URIRef(subject), predicate))
        if len(values) == 0:
            return None
        if len(values) == 1:
            return value_factory(values[0])
        raise errors.ConsistencyError(
            f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')

    def _build_predicate(uri, parent, **annotations):
        """Predicate factory."""
        # break out on root feature type
        if uri == types.ROOT_FEATURE.uri:
            return types.ROOT_FEATURE
        # clean annotations
        annotations.pop(ns.rdfs.domain, None)
        annotations.pop(ns.rdfs.range, None)
        annotations.pop(ns.bsfs.unique, None)
        # get domain
        dom = _fetch_value(uri, rdflib.RDFS.domain, URI)
        if dom is not None and dom not in nodes_lut:
            raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}')
        if dom is not None:
            dom = nodes_lut[dom]
        # get range
        rng = _fetch_value(uri, rdflib.RDFS.range, URI)
        if rng is not None and rng not in nodes_lut and rng not in literals_lut:
            raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}')
        if rng is not None:
            rng = nodes_lut.get(rng, literals_lut.get(rng))
        # get unique
        unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
        # handle feature types
        if isinstance(parent, types.Feature):
            # clean annotations
            annotations.pop(ns.bsfs.dimension, None)
            annotations.pop(ns.bsfs.dtype, None)
            annotations.pop(ns.bsfs.distance, None)
            # get dimension
            dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
            # get dtype
            dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
            # get distance
            distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
            # return feature
            return parent.child(URI(uri), domain=dom, range=rng, unique=unique,
                dtype=dtype, dimension=dimension, distance=distance, **annotations)
        # handle non-feature predicate
        return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
    predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)

    return schema.Schema(predicates, nodes, literals)



def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str:
    """Serialize a `bsfs.schema.Schema` to a string.
    See `rdflib.Graph.serialize` for viable formats (default: turtle).
    """

    # type of emitted triples.
    T_TRIPLE = typing.Iterator[typing.Tuple[rdflib.URIRef, rdflib.URIRef, rdflib.term.Identifier]]

    def _type(tpe: types._Type) -> T_TRIPLE :
        """Emit _Type properties (parent, annotations)."""
        # emit parent
        if tpe.parent is not None:
            yield (
                rdflib.URIRef(tpe.uri),
                rdflib.URIRef(ns.rdfs.subClassOf),
                rdflib.URIRef(tpe.parent.uri),
                )
        # emit annotations
        for prop, value in tpe.annotations.items():
            yield (
                rdflib.URIRef(tpe.uri),
                rdflib.URIRef(prop),
                rdflib.Literal(value), # FIXME: datatype?!
                )

    def _predicate(pred: types.Predicate) -> T_TRIPLE:
        """Emit Predicate properties (domain, range, unique)."""
        # no need to emit anything for the root predicate
        if pred == types.ROOT_PREDICATE:
            return
        # emit domain
        if pred.domain != getattr(pred.parent, 'domain', None):
            yield (
                rdflib.URIRef(pred.uri),
                rdflib.URIRef(ns.rdfs.domain),
                rdflib.URIRef(pred.domain.uri),
                )
        # emit range
        if pred.range != getattr(pred.parent, 'range', None):
            yield (
                rdflib.URIRef(pred.uri),
                rdflib.URIRef(ns.rdfs.range),
                rdflib.URIRef(pred.range.uri),
                )
        # emit cardinality
        if pred.unique != getattr(pred.parent, 'unique', None):
            yield (
                rdflib.URIRef(pred.uri),
                rdflib.URIRef(ns.bsfs.unique),
                rdflib.Literal(pred.unique, datatype=rdflib.XSD.boolean),
                )

    def _feature(feat: types.Feature) -> T_TRIPLE:
        """Emit Feature properties (dimension, dtype, distance)."""
        # emit size
        if feat.dimension != getattr(feat.parent, 'dimension', None):
            yield (
                rdflib.URIRef(feat.uri),
                rdflib.URIRef(ns.bsfs.dimension),
                rdflib.Literal(feat.dimension, datatype=rdflib.XSD.integer),
                )
        # emit dtype
        if feat.dtype != getattr(feat.parent, 'dtype', None):
            yield (
                rdflib.URIRef(feat.uri),
                rdflib.URIRef(ns.bsfs.dtype),
                rdflib.URIRef(feat.dtype),
                )
        # emit distance
        if feat.distance != getattr(feat.parent, 'distance', None):
            yield (
                rdflib.URIRef(feat.uri),
                rdflib.URIRef(ns.bsfs.distance),
                rdflib.URIRef(feat.distance),
                )

    def _parse(node: types._Type) -> T_TRIPLE:
        """Emit all properties of a type."""
        if isinstance(node, types._Type): # pylint: disable=protected-access
            # NOTE: all nodes are _Type
            yield from _type(node)
        if isinstance(node, types.Predicate):
            yield from _predicate(node)
        if isinstance(node, types.Feature):
            yield from _feature(node)

    # create graph
    graph = rdflib.Graph()
    # add triples to graph
    nodes = itertools.chain(
        schema_inst.nodes(),
        schema_inst.literals(),
        schema_inst.predicates())
    for node in nodes:
        for triple in _parse(node):
            graph.add(triple)
    # add known namespaces for readability
    # FIXME: more systematically (e.g. for all in ns?)
    graph.bind('bsfs', rdflib.URIRef('http://bsfs.ai/schema/'))
    graph.bind('bse', rdflib.URIRef('http://bsfs.ai/schema/Entity#'))
    # serialize to turtle
    return graph.serialize(format=fmt)

## EOF ##