""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # standard imports import itertools import typing # external imports import rdflib # bsfs imports from bsfs.namespace import ns from bsfs.utils import errors, URI, typename # inner-module imports from . import types from . import schema # exports __all__: typing.Sequence[str] = ( 'to_string', 'from_string', ) ## code ## def from_string(schema_str: str) -> schema.Schema: """Load and return a Schema from a string.""" # parse string into rdf graph graph = rdflib.Graph() graph.parse(data=schema_str, format='turtle') # helper functions def _fetch_value( subject: URI, predicate: rdflib.URIRef, value_factory: typing.Callable[[typing.Any], typing.Any], ) -> typing.Optional[typing.Any]: """Fetch the object of a given subject and predicate. Raises a `errors.ConsistencyError` if multiple objects match. """ values = list(graph.objects(rdflib.URIRef(subject), predicate)) if len(values) == 0: return None if len(values) == 1: return value_factory(values[0]) raise errors.ConsistencyError( f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one') def _convert(value): """Convert the subject type from rdflib to a bsfs native type.""" if isinstance(value, rdflib.Literal): return value.value if isinstance(value, rdflib.URIRef): return URI(value) # value is neither a node nor a literal, but e.g. a blank node raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}') def _fetch_hierarchically(factory, curr): """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way.""" # emit current node yield curr # walk through childs for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)): # fetch annotations annotations = { URI(pred): _convert(value) for pred, value # FIXME: preserve datatype of value?! in graph.predicate_objects(child) if URI(pred) != ns.rdfs.subClassOf } # convert child to URI child = URI(child) # check circular dependency if child == curr.uri or child in {node.uri for node in curr.parents()}: raise errors.ConsistencyError('circular dependency') # recurse and emit (sub*)childs yield from _fetch_hierarchically(factory, factory(child, curr, **annotations)) # fetch nodes nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE)) nodes_lut = {node.uri: node for node in nodes} if len(nodes_lut) != len(nodes): raise errors.ConsistencyError('inconsistent nodes') # fetch literals def _build_literal(uri, parent, **annotations): """Literal factory.""" # break out on root feature type if uri == types.ROOT_FEATURE.uri: return types.ROOT_FEATURE # handle feature types if isinstance(parent, types.Feature): # clean annotations annotations.pop(ns.bsfs.dimension, None) annotations.pop(ns.bsfs.dtype, None) annotations.pop(ns.bsfs.distance, None) # get dimension dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int) # get dtype dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI) # get distance distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI) # return feature return parent.child(URI(uri), dtype=dtype, dimension=dimension, distance=distance, **annotations) # handle non-feature types return parent.child(URI(uri), **annotations) literals = set(_fetch_hierarchically(_build_literal, types.ROOT_LITERAL)) literals_lut = {lit.uri: lit for lit in literals} if len(literals_lut) != len(literals): raise errors.ConsistencyError('inconsistent literals') # fetch predicates def _build_predicate(uri, parent, **annotations): """Predicate factory.""" # clean annotations annotations.pop(ns.rdfs.domain, None) annotations.pop(ns.rdfs.range, None) annotations.pop(ns.bsfs.unique, None) # get domain dom = _fetch_value(uri, rdflib.RDFS.domain, URI) if dom is not None and dom not in nodes_lut: raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}') if dom is not None: dom = nodes_lut[dom] # get range rng = _fetch_value(uri, rdflib.RDFS.range, URI) if rng is not None and rng not in nodes_lut and rng not in literals_lut: raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}') if rng is not None: rng = nodes_lut.get(rng, literals_lut.get(rng)) # get unique unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool) # build predicate return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations) predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE) return schema.Schema(predicates, nodes, literals) def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str: """Serialize a `bsfs.schema.Schema` to a string. See `rdflib.Graph.serialize` for viable formats (default: turtle). """ # type of emitted triples. T_TRIPLE = typing.Iterator[typing.Tuple[rdflib.URIRef, rdflib.URIRef, rdflib.term.Identifier]] def _type(tpe: types._Type) -> T_TRIPLE : """Emit _Type properties (parent, annotations).""" # emit parent if tpe.parent is not None: yield ( rdflib.URIRef(tpe.uri), rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(tpe.parent.uri), ) # emit annotations for prop, value in tpe.annotations.items(): yield ( rdflib.URIRef(tpe.uri), rdflib.URIRef(prop), rdflib.Literal(value), # FIXME: datatype?! ) def _predicate(pred: types.Predicate) -> T_TRIPLE: """Emit Predicate properties (domain, range, unique).""" # no need to emit anything for the root predicate if pred == types.ROOT_PREDICATE: return # emit domain if pred.domain != getattr(pred.parent, 'domain', None): yield ( rdflib.URIRef(pred.uri), rdflib.URIRef(ns.rdfs.domain), rdflib.URIRef(pred.domain.uri), ) # emit range if pred.range != getattr(pred.parent, 'range', None): yield ( rdflib.URIRef(pred.uri), rdflib.URIRef(ns.rdfs.range), rdflib.URIRef(pred.range.uri), ) # emit cardinality if pred.unique != getattr(pred.parent, 'unique', None): yield ( rdflib.URIRef(pred.uri), rdflib.URIRef(ns.bsfs.unique), rdflib.Literal(pred.unique, datatype=rdflib.XSD.boolean), ) def _feature(feat: types.Feature) -> T_TRIPLE: """Emit Feature properties (dimension, dtype, distance).""" # emit size if feat.dimension != getattr(feat.parent, 'dimension', None): yield ( rdflib.URIRef(feat.uri), rdflib.URIRef(ns.bsfs.dimension), rdflib.Literal(feat.dimension, datatype=rdflib.XSD.integer), ) # emit dtype if feat.dtype != getattr(feat.parent, 'dtype', None): yield ( rdflib.URIRef(feat.uri), rdflib.URIRef(ns.bsfs.dtype), rdflib.URIRef(feat.dtype), ) # emit distance if feat.distance != getattr(feat.parent, 'distance', None): yield ( rdflib.URIRef(feat.uri), rdflib.URIRef(ns.bsfs.distance), rdflib.URIRef(feat.distance), ) def _parse(node: types._Type) -> T_TRIPLE: """Emit all properties of a type.""" # check arg if not isinstance(node, types._Type): # pylint: disable=protected-access raise TypeError(node) # emit _Type essentials yield from _type(node) # emit properties of derived types if isinstance(node, types.Predicate): yield from _predicate(node) if isinstance(node, types.Feature): yield from _feature(node) # create graph graph = rdflib.Graph() # add triples to graph nodes = itertools.chain( schema_inst.nodes(), schema_inst.literals(), schema_inst.predicates()) for node in nodes: for triple in _parse(node): graph.add(triple) # add known namespaces for readability # FIXME: more generically? graph.bind('bse', rdflib.URIRef(ns.bse[''])) graph.bind('bsfs', rdflib.URIRef(ns.bsfs[''])) graph.bind('bsm', rdflib.URIRef(ns.bsm[''])) graph.bind('rdf', rdflib.URIRef(ns.rdf[''])) graph.bind('rdfs', rdflib.URIRef(ns.rdfs[''])) graph.bind('schema', rdflib.URIRef(ns.schema[''])) graph.bind('xsd', rdflib.URIRef(ns.xsd[''])) # serialize to turtle return graph.serialize(format=fmt) ## EOF ##