""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports from collections import abc, namedtuple import typing import rdflib # bsfs imports from bsfs.namespace import ns from bsfs.utils import errors, URI, typename # inner-module imports from . import types # exports __all__: typing.Sequence[str] = ( 'Schema', ) ## code ## class Schema(): """Graph schema. Use `Schema.Empty()` to create a new, empty Schema rather than construct it directly. The schema is defined by three sets: Predicates, Nodes, and Literals. The Schema class guarantees two properties: completeness and consistency. Completeness means that the schema covers all class that are referred to by any other class in the schema. Consistency means that each class is identified by a unique URI and all classes that use that URI consequently use the same definition. """ # node classes. _nodes: typing.Dict[URI, types.Node] # literal classes. _literals: typing.Dict[URI, types.Literal] # predicate classes. _predicates: typing.Dict[URI, types.Predicate] def __init__( self, predicates: typing.Iterable[types.Predicate], nodes: typing.Optional[typing.Iterable[types.Node]] = None, literals: typing.Optional[typing.Iterable[types.Literal]] = None, ): # materialize arguments if nodes is None: nodes = set() if literals is None: literals = set() nodes = set(nodes) literals = set(literals) predicates = set(predicates) # include parents in predicates set # TODO: review type annotations and ignores for python >= 3.11 (parents is _Type but should be typing.Self) predicates |= {par for pred in predicates for par in pred.parents()} # type: ignore [misc] # include predicate domain in nodes set nodes |= {pred.domain for pred in predicates} # include predicate range in nodes and literals sets prange = {pred.range for pred in predicates if pred.range is not None} nodes |= {vert for vert in prange if isinstance(vert, types.Node)} literals |= {vert for vert in prange if isinstance(vert, types.Literal)} # include parents in nodes and literals sets # NOTE: Must be done after predicate domain/range was handled # so that their parents are included as well. nodes |= {par for node in nodes for par in node.parents()} # type: ignore [misc] literals |= {par for lit in literals for par in lit.parents()} # type: ignore [misc] # assign members self._nodes = {node.uri: node for node in nodes} self._literals = {lit.uri: lit for lit in literals} self._predicates = {pred.uri: pred for pred in predicates} # verify unique uris if len(nodes) != len(self._nodes): raise errors.ConsistencyError('inconsistent nodes') if len(literals) != len(self._literals): raise errors.ConsistencyError('inconsistent literals') if len(predicates) != len(self._predicates): raise errors.ConsistencyError('inconsistent predicates') # verify globally unique uris n_uris = len(set(self._nodes) | set(self._literals) | set(self._predicates)) if n_uris != len(self._nodes) + len(self._literals) + len(self._predicates): raise errors.ConsistencyError('URI dual use') ## essentials ## def __str__(self) -> str: return f'{typename(self)}()' def __repr__(self) -> str: return f'{typename(self)}({sorted(self._nodes)}, {sorted(self._literals)}, {sorted(self._predicates)})' def __hash__(self) -> int: return hash(( type(self), tuple(sorted(self._nodes.values())), tuple(sorted(self._literals.values())), tuple(sorted(self._predicates.values())), )) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ and self._nodes == other._nodes \ and self._literals == other._literals \ and self._predicates == other._predicates ## operators ## SchemaDiff = namedtuple('SchemaDiff', ['nodes', 'literals', 'predicates']) def _issubset(self, other: 'Schema') -> bool: # inconsistent schema can't be ordered. if not self.consistent_with(other): return False # since schemas are consistent, it's sufficient to compare their URIs. # self's sets are fully contained in other's sets # pylint: disable=protected-access return set(self._predicates) <= set(other._predicates) \ and set(self._nodes) <= set(other._nodes) \ and set(self._literals) <= set(other._literals) def __lt__(self, other: typing.Any) -> bool: """Return True if *other* is a true subset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self != other and self._issubset(other) def __le__(self, other: typing.Any) -> bool: """Return True if *other* is a subset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self == other or self._issubset(other) def __gt__(self, other: typing.Any) -> bool: """Return True if *other* is a true superset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self != other and other._issubset(self) def __ge__(self, other: typing.Any) -> bool: """Return True if *other* is a superset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self == other or other._issubset(self) def diff(self, other: 'Schema') -> SchemaDiff: """Return node, literals, and predicates that are in *self* but not in *other*.""" return self.SchemaDiff( nodes=set(self.nodes()) - set(other.nodes()), literals=set(self.literals()) - set(other.literals()), predicates=set(self.predicates()) - set(other.predicates()), ) def __sub__(self, other: typing.Any) -> SchemaDiff: """Alias for `Schema.diff`.""" if not isinstance(other, Schema): return NotImplemented return self.diff(other) def consistent_with(self, other: 'Schema') -> bool: """Checks if two schemas have different predicate, node, or literal definitions for the same uri.""" # check arg if not isinstance(other, Schema): raise TypeError(other) # node consistency nodes = set(self.nodes()) | set(other.nodes()) nuris = {node.uri for node in nodes} if len(nodes) != len(nuris): return False # literal consistency literals = set(self.literals()) | set(other.literals()) luris = {lit.uri for lit in literals} if len(literals) != len(luris): return False # predicate consistency predicates = set(self.predicates()) | set(other.predicates()) puris = {pred.uri for pred in predicates} if len(predicates) != len(puris): return False # global consistency if len(puris | luris | nuris) != len(nodes) + len(literals) + len(predicates): return False # all checks passed return True @classmethod def Union( # pylint: disable=invalid-name # capitalized classmethod cls, *args: typing.Union['Schema', typing.Iterable['Schema']] ) -> 'Schema': """Combine multiple Schema instances into a single one. As argument, you can either pass multiple Schema instances, or a single iterable over Schema instances. Any abc.Iterable will be accepted. Example: >>> a, b, c = Schema.Empty(), Schema.Empty(), Schema.Empty() >>> # multiple Schema instances >>> Schema.Union(a, b, c) >>> # A single iterable over Schema instances >>> Schema.Union([a, b, c]) """ if len(args) == 0: raise TypeError('Schema.Union requires at least one argument (Schema or Iterable)') if isinstance(args[0], cls): # args is sequence of Schema instances pass elif len(args) == 1 and isinstance(args[0], abc.Iterable): # args is a single iterable args = args[0] # type: ignore [assignment] # we checked and thus know that args[0] is an iterable else: raise TypeError(f'expected multiple Schema instances or a single Iterable, found {args}') nodes, literals, predicates = set(), set(), set() for schema in args: # check argument if not isinstance(schema, cls): raise TypeError(schema) # merge with previous schemas nodes |= set(schema.nodes()) literals |= set(schema.literals()) predicates |= set(schema.predicates()) # return new Schema instance return cls(predicates, nodes, literals) def union(self, other: 'Schema') -> 'Schema': """Merge *other* and *self* into a new Schema. *self* takes precedence.""" # check type if not isinstance(other, type(self)): raise TypeError(other) # return combined schemas return self.Union(self, other) def __add__(self, other: typing.Any) -> 'Schema': """Alias for Schema.union.""" try: # return merged schemas return self.union(other) except TypeError: return NotImplemented def __or__(self, other: typing.Any) -> 'Schema': """Alias for Schema.union.""" return self.__add__(other) ## getters ## # FIXME: nodes, predicates, literals could be properties # FIXME: interchangeability of URI and _Type?! def has_node(self, node: URI) -> bool: """Return True if a Node with URI *node* is part of the schema.""" return node in self._nodes def has_literal(self, lit: URI) -> bool: """Return True if a Literal with URI *lit* is part of the schema.""" return lit in self._literals def has_predicate(self, pred: URI) -> bool: """Return True if a Predicate with URI *pred* is part of the schema.""" return pred in self._predicates def nodes(self) -> typing.Iterable[types.Node]: """Return an iterator over Node classes.""" return self._nodes.values() def literals(self) -> typing.Iterable[types.Literal]: """Return an iterator over Literal classes.""" return self._literals.values() def predicates(self) -> typing.Iterable[types.Predicate]: """Return an iterator over Predicate classes.""" return self._predicates.values() def node(self, uri: URI) -> types.Node: """Return the Node matching the *uri*.""" return self._nodes[uri] def predicate(self, uri: URI) -> types.Predicate: """Return the Predicate matching the *uri*.""" return self._predicates[uri] def literal(self, uri: URI) -> types.Literal: """Return the Literal matching the *uri*.""" return self._literals[uri] ## constructors ## @classmethod def Empty(cls) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod """Return a minimal Schema.""" node = types.Node(ns.bsfs.Node, None) literal = types.Literal(ns.bsfs.Literal, None) predicate = types.Predicate( uri=ns.bsfs.Predicate, parent=None, domain=node, range=None, unique=False, ) return cls((predicate, ), (node, ), (literal, )) @classmethod def from_string(cls, schema: str) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod """Load and return a Schema from a string.""" # parse string into rdf graph graph = rdflib.Graph() graph.parse(data=schema, format='turtle') def _fetch_hierarchically(factory, curr): # emit current node yield curr # walk through childs for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)): # convert to URI child = URI(child) # check circular dependency if child == curr.uri or child in {node.uri for node in curr.parents()}: raise errors.ConsistencyError('circular dependency') # recurse and emit (sub*)childs yield from _fetch_hierarchically(factory, factory(child, curr)) # fetch nodes nodes = set(_fetch_hierarchically(types.Node, types.Node(ns.bsfs.Node, None))) nodes_lut = {node.uri: node for node in nodes} if len(nodes_lut) != len(nodes): raise errors.ConsistencyError('inconsistent nodes') # fetch literals literals = set(_fetch_hierarchically(types.Literal, types.Literal(ns.bsfs.Literal, None))) literals_lut = {lit.uri: lit for lit in literals} if len(literals_lut) != len(literals): raise errors.ConsistencyError('inconsistent literals') # fetch predicates def build_predicate(uri, parent): uri = rdflib.URIRef(uri) # get domain domains = set(graph.objects(uri, rdflib.RDFS.domain)) if len(domains) != 1: raise errors.ConsistencyError(f'inconsistent domain: {domains}') dom = nodes_lut.get(next(iter(domains))) if dom is None: raise errors.ConsistencyError('missing domain') # get range ranges = set(graph.objects(uri, rdflib.RDFS.range)) if len(ranges) != 1: raise errors.ConsistencyError(f'inconsistent range: {ranges}') rng = next(iter(ranges)) rng = nodes_lut.get(rng, literals_lut.get(rng)) if rng is None: raise errors.ConsistencyError('missing range') # get unique flag uniques = set(graph.objects(uri, rdflib.URIRef(ns.bsfs.unique))) if len(uniques) != 1: raise errors.ConsistencyError(f'inconsistent unique flags: {uniques}') unique = bool(next(iter(uniques))) # build Predicate return types.Predicate(URI(uri), parent, dom, rng, unique) root_predicate = types.Predicate( uri=ns.bsfs.Predicate, parent=None, domain=nodes_lut[ns.bsfs.Node], range=None, # FIXME: Unclear how to handle this! Can be either a Literal or a Node unique=False, ) predicates = _fetch_hierarchically(build_predicate, root_predicate) # return Schema return cls(predicates, nodes, literals) ## EOF ##