diff options
Diffstat (limited to 'bsfs/schema/serialize.py')
-rw-r--r-- | bsfs/schema/serialize.py | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/bsfs/schema/serialize.py b/bsfs/schema/serialize.py new file mode 100644 index 0000000..ea8b2f4 --- /dev/null +++ b/bsfs/schema/serialize.py @@ -0,0 +1,255 @@ + +# standard imports +import itertools +import typing + +# external imports +import rdflib + +# bsfs imports +from bsfs.namespace import ns +from bsfs.utils import errors, URI, typename + +# inner-module imports +from . import types +from . import schema + +# exports +__all__: typing.Sequence[str] = ( + 'to_string', + 'from_string', + ) + + +## code ## + +def from_string(schema_str: str) -> schema.Schema: + """Load and return a Schema from a string.""" + # parse string into rdf graph + graph = rdflib.Graph() + graph.parse(data=schema_str, format='turtle') + + # helper functions + def _fetch_value( + subject: URI, + predicate: rdflib.URIRef, + value_factory: typing.Callable[[typing.Any], typing.Any], + ) -> typing.Optional[typing.Any]: + """Fetch the object of a given subject and predicate. + Raises a `errors.ConsistencyError` if multiple objects match. + """ + values = list(graph.objects(rdflib.URIRef(subject), predicate)) + if len(values) == 0: + return None + if len(values) == 1: + return value_factory(values[0]) + raise errors.ConsistencyError( + f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one') + + def _convert(value): + """Convert the subject type from rdflib to a bsfs native type.""" + if isinstance(value, rdflib.Literal): + return value.value + if isinstance(value, rdflib.URIRef): + return URI(value) + # value is neither a node nor a literal, but e.g. a blank node + raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}') + + def _fetch_hierarchically(factory, curr): + """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way.""" + # emit current node + yield curr + # walk through childs + for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)): + # fetch annotations + annotations = { + URI(pred): _convert(value) + for pred, value # FIXME: preserve datatype of value?! + in graph.predicate_objects(child) + if URI(pred) != ns.rdfs.subClassOf + } + # convert child to URI + child = URI(child) + # check circular dependency + if child == curr.uri or child in {node.uri for node in curr.parents()}: + raise errors.ConsistencyError('circular dependency') + # recurse and emit (sub*)childs + yield from _fetch_hierarchically(factory, factory(child, curr, **annotations)) + + # fetch nodes + nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE)) + nodes_lut = {node.uri: node for node in nodes} + if len(nodes_lut) != len(nodes): + raise errors.ConsistencyError('inconsistent nodes') + + # fetch literals + def _build_literal(uri, parent, **annotations): + """Literal factory.""" + # break out on root feature type + if uri == types.ROOT_FEATURE.uri: + return types.ROOT_FEATURE + # handle feature types + if isinstance(parent, types.Feature): + # clean annotations + annotations.pop(ns.bsfs.dimension, None) + annotations.pop(ns.bsfs.dtype, None) + annotations.pop(ns.bsfs.distance, None) + # get dimension + dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int) + # get dtype + dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI) + # get distance + distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI) + # return feature + return parent.child(URI(uri), dtype=dtype, dimension=dimension, distance=distance, **annotations) + # handle non-feature types + return parent.child(URI(uri), **annotations) + + literals = set(_fetch_hierarchically(_build_literal, types.ROOT_LITERAL)) + literals_lut = {lit.uri: lit for lit in literals} + if len(literals_lut) != len(literals): + raise errors.ConsistencyError('inconsistent literals') + + # fetch predicates + def _build_predicate(uri, parent, **annotations): + """Predicate factory.""" + # clean annotations + annotations.pop(ns.rdfs.domain, None) + annotations.pop(ns.rdfs.range, None) + annotations.pop(ns.bsfs.unique, None) + # get domain + dom = _fetch_value(uri, rdflib.RDFS.domain, URI) + if dom is not None and dom not in nodes_lut: + raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}') + if dom is not None: + dom = nodes_lut[dom] + # get range + rng = _fetch_value(uri, rdflib.RDFS.range, URI) + if rng is not None and rng not in nodes_lut and rng not in literals_lut: + raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}') + if rng is not None: + rng = nodes_lut.get(rng, literals_lut.get(rng)) + # get unique + unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool) + # build predicate + return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations) + + predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE) + + return schema.Schema(predicates, nodes, literals) + + + +def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str: + """Serialize a `bsfs.schema.Schema` to a string. + See `rdflib.Graph.serialize` for viable formats (default: turtle). + """ + + # type of emitted triples. + T_TRIPLE = typing.Iterator[typing.Tuple[rdflib.URIRef, rdflib.URIRef, rdflib.term.Identifier]] + + def _type(tpe: types._Type) -> T_TRIPLE : + """Emit _Type properties (parent, annotations).""" + # emit parent + if tpe.parent is not None: + yield ( + rdflib.URIRef(tpe.uri), + rdflib.URIRef(ns.rdfs.subClassOf), + rdflib.URIRef(tpe.parent.uri), + ) + # emit annotations + for prop, value in tpe.annotations.items(): + yield ( + rdflib.URIRef(tpe.uri), + rdflib.URIRef(prop), + rdflib.Literal(value), # FIXME: datatype?! + ) + + def _predicate(pred: types.Predicate) -> T_TRIPLE: + """Emit Predicate properties (domain, range, unique).""" + # no need to emit anything for the root predicate + if pred == types.ROOT_PREDICATE: + return + # emit domain + if pred.domain != getattr(pred.parent, 'domain', None): + yield ( + rdflib.URIRef(pred.uri), + rdflib.URIRef(ns.rdfs.domain), + rdflib.URIRef(pred.domain.uri), + ) + # emit range + if pred.range != getattr(pred.parent, 'range', None): + yield ( + rdflib.URIRef(pred.uri), + rdflib.URIRef(ns.rdfs.range), + rdflib.URIRef(pred.range.uri), + ) + # emit cardinality + if pred.unique != getattr(pred.parent, 'unique', None): + yield ( + rdflib.URIRef(pred.uri), + rdflib.URIRef(ns.bsfs.unique), + rdflib.Literal(pred.unique, datatype=rdflib.XSD.boolean), + ) + + def _feature(feat: types.Feature) -> T_TRIPLE: + """Emit Feature properties (dimension, dtype, distance).""" + # emit size + if feat.dimension != getattr(feat.parent, 'dimension', None): + yield ( + rdflib.URIRef(feat.uri), + rdflib.URIRef(ns.bsfs.dimension), + rdflib.Literal(feat.dimension, datatype=rdflib.XSD.integer), + ) + # emit dtype + if feat.dtype != getattr(feat.parent, 'dtype', None): + yield ( + rdflib.URIRef(feat.uri), + rdflib.URIRef(ns.bsfs.dtype), + rdflib.URIRef(feat.dtype), + ) + # emit distance + if feat.distance != getattr(feat.parent, 'distance', None): + yield ( + rdflib.URIRef(feat.uri), + rdflib.URIRef(ns.bsfs.distance), + rdflib.URIRef(feat.distance), + ) + + def _parse(node: types._Type) -> T_TRIPLE: + """Emit all properties of a type.""" + # check arg + if not isinstance(node, types._Type): # pylint: disable=protected-access + raise TypeError(node) + # emit _Type essentials + yield from _type(node) + # emit properties of derived types + if isinstance(node, types.Predicate): + yield from _predicate(node) + if isinstance(node, types.Feature): + yield from _feature(node) + + # create graph + graph = rdflib.Graph() + # add triples to graph + nodes = itertools.chain( + schema_inst.nodes(), + schema_inst.literals(), + schema_inst.predicates()) + for node in nodes: + for triple in _parse(node): + graph.add(triple) + # add known namespaces for readability + # FIXME: more generically? + graph.bind('bsfs', rdflib.URIRef(ns.bsfs + '/')) + graph.bind('bsl', rdflib.URIRef(ns.bsl + '/')) + graph.bind('bsn', rdflib.URIRef(ns.bsn + '#')) + graph.bind('bse', rdflib.URIRef(ns.bsfs.Entity() + '#')) + graph.bind('rdf', rdflib.URIRef(ns.rdf)) + graph.bind('rdfs', rdflib.URIRef(ns.rdfs)) + graph.bind('schema', rdflib.URIRef(ns.schema)) + graph.bind('xsd', rdflib.URIRef(ns.xsd)) + # serialize to turtle + return graph.serialize(format=fmt) + +## EOF ## |