aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/schema/serialize.py
blob: 0eb6628502590e05873494cb98daeb7c358c4706 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""

Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
import itertools
import typing

# external imports
import rdflib

# bsfs imports
from bsfs.namespace import ns
from bsfs.utils import errors, URI, typename

# inner-module imports
from . import types
from . import schema

# exports
__all__: typing.Sequence[str] = (
    'to_string',
    'from_string',
    )


## code ##

def from_string(schema_str: str) -> schema.Schema:
    """Load and return a Schema from a string."""
    # parse string into rdf graph
    graph = rdflib.Graph()
    graph.parse(data=schema_str, format='turtle')

    # helper functions
    def _convert(value):
        """Convert the subject type from rdflib to a bsfs native type."""
        if isinstance(value, rdflib.Literal):
            return value.value
        if isinstance(value, rdflib.URIRef):
            return URI(value)
        raise errors.UnreachableError(f'expected Literal or URIRef, found {typename(value)}')

    def _fetch_hierarchically(factory, curr):
        """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
        # emit current node
        yield curr
        # walk through childs
        for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
            # fetch annotations
            annotations = {
                URI(pred): _convert(value)
                for pred, value # FIXME: preserve datatype of value?!
                in graph.predicate_objects(child)
                if URI(pred) != ns.rdfs.subClassOf
                }
            # convert child to URI
            child = URI(child)
            # check circular dependency
            if child == curr.uri or child in {node.uri for node in curr.parents()}:
                raise errors.ConsistencyError('circular dependency')
            # recurse and emit (sub*)childs
            yield from _fetch_hierarchically(factory, factory(child, curr, **annotations))

    # fetch nodes
    nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE))
    nodes_lut = {node.uri: node for node in nodes}
    if len(nodes_lut) != len(nodes):
        raise errors.ConsistencyError('inconsistent nodes')

    # fetch literals
    literals = set(_fetch_hierarchically(types.Literal, types.ROOT_LITERAL))
    literals_lut = {lit.uri: lit for lit in literals}
    if len(literals_lut) != len(literals):
        raise errors.ConsistencyError('inconsistent literals')

    # fetch predicates
    # FIXME: type annotation
    def _fetch_value(subject: URI, predicate: rdflib.URIRef, value_factory) -> typing.Optional[typing.Any]:
        """Fetch the object of a given subject and predicate.
        Raises a `errors.ConsistencyError` if multiple objects match.
        """
        values = list(graph.objects(rdflib.URIRef(subject), predicate))
        if len(values) == 0:
            return None
        if len(values) == 1:
            return value_factory(values[0])
        raise errors.ConsistencyError(
            f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')

    def _build_predicate(uri, parent, **annotations):
        """Predicate factory."""
        # break out on root feature type
        if uri == types.ROOT_FEATURE.uri:
            return types.ROOT_FEATURE
        # clean annotations
        annotations.pop(ns.rdfs.domain, None)
        annotations.pop(ns.rdfs.range, None)
        annotations.pop(ns.bsfs.unique, None)
        # get domain
        dom = _fetch_value(uri, rdflib.RDFS.domain, URI)
        if dom is not None and dom not in nodes_lut:
            raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}')
        if dom is not None:
            dom = nodes_lut[dom]
        # get range
        rng = _fetch_value(uri, rdflib.RDFS.range, URI)
        if rng is not None and rng not in nodes_lut and rng not in literals_lut:
            raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}')
        if rng is not None:
            rng = nodes_lut.get(rng, literals_lut.get(rng))
        # get unique
        unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
        # handle feature types
        if isinstance(parent, types.Feature):
            # clean annotations
            annotations.pop(ns.bsfs.dimension, None)
            annotations.pop(ns.bsfs.dtype, None)
            annotations.pop(ns.bsfs.distance, None)
            # get dimension
            dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
            # get dtype
            dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
            # get distance
            distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
            # return feature
            return parent.child(URI(uri), domain=dom, range=rng, unique=unique,
                dtype=dtype, dimension=dimension, distance=distance, **annotations)
        # handle non-feature predicate
        return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
    predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)

    return schema.Schema(predicates, nodes, literals)



def to_string(schema_inst: schema.Schema) -> str:
    """
    """
    raise NotImplementedError()

## EOF ##