""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports from collections import abc, namedtuple import typing # bsfs imports from bsfs.utils import errors, URI, typename # inner-module imports from . import types # exports __all__: typing.Sequence[str] = ( 'Schema', ) ## code ## class Schema(): """Graph schema. Use `Schema.Empty()` to create a new, empty Schema rather than construct it directly. The schema is defined by three sets: Predicates, Nodes, and Literals. The Schema class guarantees two properties: completeness and consistency. Completeness means that the schema covers all class that are referred to by any other class in the schema. Consistency means that each class is identified by a unique URI and all classes that use that URI consequently use the same definition. """ # node classes. _nodes: typing.Dict[URI, types.Node] # literal classes. _literals: typing.Dict[URI, types.Literal] # predicate classes. _predicates: typing.Dict[URI, types.Predicate] def __init__( self, predicates: typing.Optional[typing.Iterable[types.Predicate]] = None, nodes: typing.Optional[typing.Iterable[types.Node]] = None, literals: typing.Optional[typing.Iterable[types.Literal]] = None, ): # materialize arguments if predicates is None: predicates = set() if nodes is None: nodes = set() if literals is None: literals = set() nodes = set(nodes) literals = set(literals) predicates = set(predicates) # add root types to the schema nodes.add(types.ROOT_NODE) literals.add(types.ROOT_LITERAL) predicates.add(types.ROOT_PREDICATE) # add minimally necessary types to the schema literals.add(types.ROOT_BLOB) literals.add(types.ROOT_NUMBER) literals.add(types.ROOT_TIME) literals.add(types.ROOT_ARRAY) literals.add(types.ROOT_FEATURE) # FIXME: ensure that types derive from the right root? # include parents in predicates set # TODO: review type annotations and ignores for python >= 3.11 (parents is _Type but should be typing.Self) predicates |= {par for pred in predicates for par in pred.parents()} # type: ignore [misc] # include predicate domain in nodes set nodes |= {pred.domain for pred in predicates} # include predicate range in nodes and literals sets prange = {pred.range for pred in predicates} nodes |= {vert for vert in prange if isinstance(vert, types.Node)} literals |= {vert for vert in prange if isinstance(vert, types.Literal)} # NOTE: ROOT_PREDICATE has a Vertex as range which is neither in nodes nor literals # FIXME: with the ROOT_VERTEX missing, the schema is not complete anymore! # include parents in nodes and literals sets # NOTE: Must come after predicate domain/range was handled to have their parents as well. nodes |= {par for node in nodes for par in node.parents()} # type: ignore [misc] literals |= {par for lit in literals for par in lit.parents()} # type: ignore [misc] # assign members self._nodes = {node.uri: node for node in nodes} self._literals = {lit.uri: lit for lit in literals} self._predicates = {pred.uri: pred for pred in predicates} # verify unique uris if len(nodes) != len(self._nodes): raise errors.ConsistencyError('inconsistent nodes') if len(literals) != len(self._literals): raise errors.ConsistencyError('inconsistent literals') if len(predicates) != len(self._predicates): raise errors.ConsistencyError('inconsistent predicates') # verify globally unique uris n_uris = len(set(self._nodes) | set(self._literals) | set(self._predicates)) if n_uris != len(self._nodes) + len(self._literals) + len(self._predicates): raise errors.ConsistencyError('URI dual use') ## essentials ## def __str__(self) -> str: return f'{typename(self)}()' def __repr__(self) -> str: return f'{typename(self)}({sorted(self._nodes)}, {sorted(self._literals)}, {sorted(self._predicates)})' def __hash__(self) -> int: return hash(( type(self), tuple(sorted(self._nodes.values())), tuple(sorted(self._literals.values())), tuple(sorted(self._predicates.values())), )) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ and self._nodes == other._nodes \ and self._literals == other._literals \ and self._predicates == other._predicates ## operators ## SchemaDiff = namedtuple('SchemaDiff', ['nodes', 'literals', 'predicates']) def _issubset(self, other: 'Schema') -> bool: # inconsistent schema can't be ordered. if not self.consistent_with(other): return False # since schemas are consistent, it's sufficient to compare their URIs. # self's sets are fully contained in other's sets # pylint: disable=protected-access return set(self._predicates) <= set(other._predicates) \ and set(self._nodes) <= set(other._nodes) \ and set(self._literals) <= set(other._literals) def __lt__(self, other: typing.Any) -> bool: """Return True if *other* is a true subset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self != other and self._issubset(other) def __le__(self, other: typing.Any) -> bool: """Return True if *other* is a subset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self == other or self._issubset(other) def __gt__(self, other: typing.Any) -> bool: """Return True if *other* is a true superset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self != other and other._issubset(self) def __ge__(self, other: typing.Any) -> bool: """Return True if *other* is a superset of *self*.""" if not isinstance(other, Schema): # other is not a Schema return NotImplemented return self == other or other._issubset(self) def diff(self, other: 'Schema') -> SchemaDiff: """Return node, literals, and predicates that are in *self* but not in *other*.""" return self.SchemaDiff( nodes=set(self.nodes()) - set(other.nodes()), literals=set(self.literals()) - set(other.literals()), predicates=set(self.predicates()) - set(other.predicates()), ) def __sub__(self, other: typing.Any) -> SchemaDiff: """Alias for `Schema.diff`.""" if not isinstance(other, Schema): return NotImplemented return self.diff(other) def consistent_with(self, other: 'Schema') -> bool: """Checks if two schemas have different predicate, node, or literal definitions for the same uri.""" # check arg if not isinstance(other, Schema): raise TypeError(other) # node consistency nodes = set(self.nodes()) | set(other.nodes()) nuris = {node.uri for node in nodes} if len(nodes) != len(nuris): return False # literal consistency literals = set(self.literals()) | set(other.literals()) luris = {lit.uri for lit in literals} if len(literals) != len(luris): return False # predicate consistency predicates = set(self.predicates()) | set(other.predicates()) puris = {pred.uri for pred in predicates} if len(predicates) != len(puris): return False # global consistency if len(puris | luris | nuris) != len(nodes) + len(literals) + len(predicates): return False # all checks passed return True @classmethod def Union( # pylint: disable=invalid-name # capitalized classmethod cls, *args: typing.Union['Schema', typing.Iterable['Schema']] ) -> 'Schema': """Combine multiple Schema instances into a single one. As argument, you can either pass multiple Schema instances, or a single iterable over Schema instances. Any abc.Iterable will be accepted. Example: >>> a, b, c = Schema.Empty(), Schema.Empty(), Schema.Empty() >>> # multiple Schema instances >>> Schema.Union(a, b, c) >>> # A single iterable over Schema instances >>> Schema.Union([a, b, c]) """ # FIXME: copy type annotations? if len(args) == 0: raise TypeError('Schema.Union requires at least one argument (Schema or Iterable)') if isinstance(args[0], cls): # args is sequence of Schema instances pass elif len(args) == 1 and isinstance(args[0], abc.Iterable): # args is a single iterable args = args[0] # type: ignore [assignment] # we checked and thus know that args[0] is an iterable else: raise TypeError(f'expected multiple Schema instances or a single Iterable, found {args}') nodes, literals, predicates = set(), set(), set() for schema in args: # check argument if not isinstance(schema, cls): raise TypeError(schema) # merge with previous schemas nodes |= set(schema.nodes()) literals |= set(schema.literals()) predicates |= set(schema.predicates()) # return new Schema instance return cls(predicates, nodes, literals) def union(self, other: 'Schema') -> 'Schema': """Merge *other* and *self* into a new Schema. *self* takes precedence.""" # check type if not isinstance(other, type(self)): raise TypeError(other) # return combined schemas return self.Union(self, other) def __add__(self, other: typing.Any) -> 'Schema': """Alias for Schema.union.""" try: # return merged schemas return self.union(other) except TypeError: return NotImplemented def __or__(self, other: typing.Any) -> 'Schema': """Alias for Schema.union.""" return self.__add__(other) ## getters ## # FIXME: nodes, predicates, literals could be properties # FIXME: interchangeability of URI and _Type?! def has_node(self, node: URI) -> bool: """Return True if a Node with URI *node* is part of the schema.""" return node in self._nodes def has_literal(self, lit: URI) -> bool: """Return True if a Literal with URI *lit* is part of the schema.""" return lit in self._literals def has_predicate(self, pred: URI) -> bool: """Return True if a Predicate with URI *pred* is part of the schema.""" return pred in self._predicates def nodes(self) -> typing.Iterable[types.Node]: """Return an iterator over Node classes.""" return self._nodes.values() def literals(self) -> typing.Iterable[types.Literal]: """Return an iterator over Literal classes.""" return self._literals.values() def predicates(self) -> typing.Iterable[types.Predicate]: """Return an iterator over Predicate classes.""" return self._predicates.values() def node(self, uri: URI) -> types.Node: """Return the Node matching the *uri*.""" return self._nodes[uri] def predicate(self, uri: URI) -> types.Predicate: """Return the Predicate matching the *uri*.""" return self._predicates[uri] def literal(self, uri: URI) -> types.Literal: """Return the Literal matching the *uri*.""" return self._literals[uri] def predicates_at(self, node: types.Node) -> typing.Iterator[types.Predicate]: """Return predicates that have domain *node* (or superclass thereof).""" return iter(pred for pred in self._predicates.values() if node <= pred.domain) ## EOF ##