aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/schema/serialize.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/schema/serialize.py')
-rw-r--r--bsfs/schema/serialize.py259
1 files changed, 259 insertions, 0 deletions
diff --git a/bsfs/schema/serialize.py b/bsfs/schema/serialize.py
new file mode 100644
index 0000000..acc009a
--- /dev/null
+++ b/bsfs/schema/serialize.py
@@ -0,0 +1,259 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+import itertools
+import typing
+
+# external imports
+import rdflib
+
+# bsfs imports
+from bsfs.namespace import ns
+from bsfs.utils import errors, URI, typename
+
+# inner-module imports
+from . import types
+from . import schema
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'to_string',
+ 'from_string',
+ )
+
+
+## code ##
+
+def from_string(schema_str: str) -> schema.Schema:
+ """Load and return a Schema from a string."""
+ # parse string into rdf graph
+ graph = rdflib.Graph()
+ graph.parse(data=schema_str, format='turtle')
+
+ # helper functions
+ def _fetch_value(
+ subject: URI,
+ predicate: rdflib.URIRef,
+ value_factory: typing.Callable[[typing.Any], typing.Any],
+ ) -> typing.Optional[typing.Any]:
+ """Fetch the object of a given subject and predicate.
+ Raises a `errors.ConsistencyError` if multiple objects match.
+ """
+ values = list(graph.objects(rdflib.URIRef(subject), predicate))
+ if len(values) == 0:
+ return None
+ if len(values) == 1:
+ return value_factory(values[0])
+ raise errors.ConsistencyError(
+ f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')
+
+ def _convert(value):
+ """Convert the subject type from rdflib to a bsfs native type."""
+ if isinstance(value, rdflib.Literal):
+ return value.value
+ if isinstance(value, rdflib.URIRef):
+ return URI(value)
+ # value is neither a node nor a literal, but e.g. a blank node
+ raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}')
+
+ def _fetch_hierarchically(factory, curr):
+ """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
+ # emit current node
+ yield curr
+ # walk through childs
+ for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
+ # fetch annotations
+ annotations = {
+ URI(pred): _convert(value)
+ for pred, value # FIXME: preserve datatype of value?!
+ in graph.predicate_objects(child)
+ if URI(pred) != ns.rdfs.subClassOf
+ }
+ # convert child to URI
+ child = URI(child)
+ # check circular dependency
+ if child == curr.uri or child in {node.uri for node in curr.parents()}:
+ raise errors.ConsistencyError('circular dependency')
+ # recurse and emit (sub*)childs
+ yield from _fetch_hierarchically(factory, factory(child, curr, **annotations))
+
+ # fetch nodes
+ nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE))
+ nodes_lut = {node.uri: node for node in nodes}
+ if len(nodes_lut) != len(nodes):
+ raise errors.ConsistencyError('inconsistent nodes')
+
+ # fetch literals
+ def _build_literal(uri, parent, **annotations):
+ """Literal factory."""
+ # break out on root feature type
+ if uri == types.ROOT_FEATURE.uri:
+ return types.ROOT_FEATURE
+ # handle feature types
+ if isinstance(parent, types.Feature):
+ # clean annotations
+ annotations.pop(ns.bsfs.dimension, None)
+ annotations.pop(ns.bsfs.dtype, None)
+ annotations.pop(ns.bsfs.distance, None)
+ # get dimension
+ dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
+ # get dtype
+ dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
+ # get distance
+ distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
+ # return feature
+ return parent.child(URI(uri), dtype=dtype, dimension=dimension, distance=distance, **annotations)
+ # handle non-feature types
+ return parent.child(URI(uri), **annotations)
+
+ literals = set(_fetch_hierarchically(_build_literal, types.ROOT_LITERAL))
+ literals_lut = {lit.uri: lit for lit in literals}
+ if len(literals_lut) != len(literals):
+ raise errors.ConsistencyError('inconsistent literals')
+
+ # fetch predicates
+ def _build_predicate(uri, parent, **annotations):
+ """Predicate factory."""
+ # clean annotations
+ annotations.pop(ns.rdfs.domain, None)
+ annotations.pop(ns.rdfs.range, None)
+ annotations.pop(ns.bsfs.unique, None)
+ # get domain
+ dom = _fetch_value(uri, rdflib.RDFS.domain, URI)
+ if dom is not None and dom not in nodes_lut:
+ raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}')
+ if dom is not None:
+ dom = nodes_lut[dom]
+ # get range
+ rng = _fetch_value(uri, rdflib.RDFS.range, URI)
+ if rng is not None and rng not in nodes_lut and rng not in literals_lut:
+ raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}')
+ if rng is not None:
+ rng = nodes_lut.get(rng, literals_lut.get(rng))
+ # get unique
+ unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
+ # build predicate
+ return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
+
+ predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)
+
+ return schema.Schema(predicates, nodes, literals)
+
+
+
+def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str:
+ """Serialize a `bsfs.schema.Schema` to a string.
+ See `rdflib.Graph.serialize` for viable formats (default: turtle).
+ """
+
+ # type of emitted triples.
+ T_TRIPLE = typing.Iterator[typing.Tuple[rdflib.URIRef, rdflib.URIRef, rdflib.term.Identifier]]
+
+ def _type(tpe: types._Type) -> T_TRIPLE :
+ """Emit _Type properties (parent, annotations)."""
+ # emit parent
+ if tpe.parent is not None:
+ yield (
+ rdflib.URIRef(tpe.uri),
+ rdflib.URIRef(ns.rdfs.subClassOf),
+ rdflib.URIRef(tpe.parent.uri),
+ )
+ # emit annotations
+ for prop, value in tpe.annotations.items():
+ yield (
+ rdflib.URIRef(tpe.uri),
+ rdflib.URIRef(prop),
+ rdflib.Literal(value), # FIXME: datatype?!
+ )
+
+ def _predicate(pred: types.Predicate) -> T_TRIPLE:
+ """Emit Predicate properties (domain, range, unique)."""
+ # no need to emit anything for the root predicate
+ if pred == types.ROOT_PREDICATE:
+ return
+ # emit domain
+ if pred.domain != getattr(pred.parent, 'domain', None):
+ yield (
+ rdflib.URIRef(pred.uri),
+ rdflib.URIRef(ns.rdfs.domain),
+ rdflib.URIRef(pred.domain.uri),
+ )
+ # emit range
+ if pred.range != getattr(pred.parent, 'range', None):
+ yield (
+ rdflib.URIRef(pred.uri),
+ rdflib.URIRef(ns.rdfs.range),
+ rdflib.URIRef(pred.range.uri),
+ )
+ # emit cardinality
+ if pred.unique != getattr(pred.parent, 'unique', None):
+ yield (
+ rdflib.URIRef(pred.uri),
+ rdflib.URIRef(ns.bsfs.unique),
+ rdflib.Literal(pred.unique, datatype=rdflib.XSD.boolean),
+ )
+
+ def _feature(feat: types.Feature) -> T_TRIPLE:
+ """Emit Feature properties (dimension, dtype, distance)."""
+ # emit size
+ if feat.dimension != getattr(feat.parent, 'dimension', None):
+ yield (
+ rdflib.URIRef(feat.uri),
+ rdflib.URIRef(ns.bsfs.dimension),
+ rdflib.Literal(feat.dimension, datatype=rdflib.XSD.integer),
+ )
+ # emit dtype
+ if feat.dtype != getattr(feat.parent, 'dtype', None):
+ yield (
+ rdflib.URIRef(feat.uri),
+ rdflib.URIRef(ns.bsfs.dtype),
+ rdflib.URIRef(feat.dtype),
+ )
+ # emit distance
+ if feat.distance != getattr(feat.parent, 'distance', None):
+ yield (
+ rdflib.URIRef(feat.uri),
+ rdflib.URIRef(ns.bsfs.distance),
+ rdflib.URIRef(feat.distance),
+ )
+
+ def _parse(node: types._Type) -> T_TRIPLE:
+ """Emit all properties of a type."""
+ # check arg
+ if not isinstance(node, types._Type): # pylint: disable=protected-access
+ raise TypeError(node)
+ # emit _Type essentials
+ yield from _type(node)
+ # emit properties of derived types
+ if isinstance(node, types.Predicate):
+ yield from _predicate(node)
+ if isinstance(node, types.Feature):
+ yield from _feature(node)
+
+ # create graph
+ graph = rdflib.Graph()
+ # add triples to graph
+ nodes = itertools.chain(
+ schema_inst.nodes(),
+ schema_inst.literals(),
+ schema_inst.predicates())
+ for node in nodes:
+ for triple in _parse(node):
+ graph.add(triple)
+ # add known namespaces for readability
+ # FIXME: more generically?
+ graph.bind('bse', rdflib.URIRef(ns.bse['']))
+ graph.bind('bsfs', rdflib.URIRef(ns.bsfs['']))
+ graph.bind('bsm', rdflib.URIRef(ns.bsm['']))
+ graph.bind('rdf', rdflib.URIRef(ns.rdf['']))
+ graph.bind('rdfs', rdflib.URIRef(ns.rdfs['']))
+ graph.bind('schema', rdflib.URIRef(ns.schema['']))
+ graph.bind('xsd', rdflib.URIRef(ns.xsd['']))
+ # serialize to turtle
+ return graph.serialize(format=fmt)
+
+## EOF ##