""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # standard imports from collections import abc import itertools import typing # external imports import rdflib # bsfs imports from bsfs.namespace import ns from bsfs.utils import errors, URI, typename # inner-module imports from . import types from . import schema # exports __all__: typing.Sequence[str] = ( 'to_string', 'from_string', ) ## code ## def from_string(schema_str: str) -> schema.Schema: """Load and return a Schema from a string.""" # parse string into rdf graph graph = rdflib.Graph() graph.parse(data=schema_str, format='turtle') # helper functions def _convert(value): """Convert the subject type from rdflib to a bsfs native type.""" if isinstance(value, rdflib.Literal): return value.value if isinstance(value, rdflib.URIRef): return URI(value) raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}') def _fetch_hierarchically(factory, curr): """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way.""" # emit current node yield curr # walk through childs for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)): # fetch annotations annotations = { URI(pred): _convert(value) for pred, value # FIXME: preserve datatype of value?! in graph.predicate_objects(child) if URI(pred) != ns.rdfs.subClassOf } # convert child to URI child = URI(child) # check circular dependency if child == curr.uri or child in {node.uri for node in curr.parents()}: raise errors.ConsistencyError('circular dependency') # recurse and emit (sub*)childs yield from _fetch_hierarchically(factory, factory(child, curr, **annotations)) # fetch nodes nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE)) nodes_lut = {node.uri: node for node in nodes} if len(nodes_lut) != len(nodes): raise errors.ConsistencyError('inconsistent nodes') # fetch literals literals = set(_fetch_hierarchically(types.Literal, types.ROOT_LITERAL)) literals_lut = {lit.uri: lit for lit in literals} if len(literals_lut) != len(literals): raise errors.ConsistencyError('inconsistent literals') # fetch predicates # FIXME: type annotation def _fetch_value(subject: URI, predicate: rdflib.URIRef, value_factory) -> typing.Optional[typing.Any]: """Fetch the object of a given subject and predicate. Raises a `errors.ConsistencyError` if multiple objects match.""" values = list(graph.objects(rdflib.URIRef(subject), predicate)) if len(values) == 0: return None elif len(values) == 1: return value_factory(values[0]) else: raise errors.ConsistencyError(f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one') def _build_predicate(uri, parent, **annotations): """Predicate factory.""" # break out on root feature type if uri == types.ROOT_FEATURE.uri: return types.ROOT_FEATURE # clean annotations annotations.pop(ns.rdfs.domain, None) annotations.pop(ns.rdfs.range, None) annotations.pop(ns.bsfs.unique, None) # get domain dom = _fetch_value(uri, rdflib.RDFS.domain, URI) if dom is not None and dom not in nodes_lut: raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}') elif dom is not None: dom = nodes_lut[dom] # get range rng = _fetch_value(uri, rdflib.RDFS.range, URI) if rng is not None and rng not in nodes_lut and rng not in literals_lut: raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}') elif rng is not None: rng = nodes_lut.get(rng, literals_lut.get(rng)) # get unique unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool) # handle feature types if isinstance(parent, types.Feature): # clean annotations annotations.pop(ns.bsfs.dimension, None) annotations.pop(ns.bsfs.dtype, None) annotations.pop(ns.bsfs.distance, None) # get dimension dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int) # get dtype dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI) # get distance distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI) # return feature return parent.child(URI(uri), domain=dom, range=rng, unique=unique, dtype=dtype, dimension=dimension, distance=distance, **annotations) # handle non-feature predicate return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations) predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE) return schema.Schema(predicates, nodes, literals) def to_string(schema_inst: schema.Schema) -> str: """ """ raise NotImplementedError() ## EOF ##