diff options
Diffstat (limited to 'bsfs/schema/schema.py')
-rw-r--r-- | bsfs/schema/schema.py | 386 |
1 files changed, 386 insertions, 0 deletions
diff --git a/bsfs/schema/schema.py b/bsfs/schema/schema.py new file mode 100644 index 0000000..c5d4571 --- /dev/null +++ b/bsfs/schema/schema.py @@ -0,0 +1,386 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +from collections import abc, namedtuple +import typing +import rdflib + +# bsfs imports +from bsfs.namespace import ns +from bsfs.utils import errors, URI, typename + +# inner-module imports +from . import types + +# exports +__all__: typing.Sequence[str] = ( + 'Schema', + ) + + +## code ## + +class Schema(): + """Graph schema. + + Use `Schema.Empty()` to create a new, empty Schema rather than construct + it directly. + + The schema is defined by three sets: Predicates, Nodes, and Literals. + + The Schema class guarantees two properties: completeness and consistency. + Completeness means that the schema covers all class that are referred to + by any other class in the schema. Consistency means that each class is + identified by a unique URI and all classes that use that URI consequently + use the same definition. + + """ + + # node classes. + _nodes: typing.Dict[URI, types.Node] + + # literal classes. + _literals: typing.Dict[URI, types.Literal] + + # predicate classes. + _predicates: typing.Dict[URI, types.Predicate] + + def __init__( + self, + predicates: typing.Iterable[types.Predicate], + nodes: typing.Optional[typing.Iterable[types.Node]] = None, + literals: typing.Optional[typing.Iterable[types.Literal]] = None, + ): + # materialize arguments + if nodes is None: + nodes = set() + if literals is None: + literals = set() + nodes = set(nodes) + literals = set(literals) + predicates = set(predicates) + # include parents in predicates set + # TODO: review type annotations and ignores for python >= 3.11 (parents is _Type but should be typing.Self) + predicates |= {par for pred in predicates for par in pred.parents()} # type: ignore [misc] + # include predicate domain in nodes set + nodes |= {pred.domain for pred in predicates} + # include predicate range in nodes and literals sets + prange = {pred.range for pred in predicates if pred.range is not None} + nodes |= {vert for vert in prange if isinstance(vert, types.Node)} + literals |= {vert for vert in prange if isinstance(vert, types.Literal)} + # include parents in nodes and literals sets + # NOTE: Must be done after predicate domain/range was handled + # so that their parents are included as well. + nodes |= {par for node in nodes for par in node.parents()} # type: ignore [misc] + literals |= {par for lit in literals for par in lit.parents()} # type: ignore [misc] + # assign members + self._nodes = {node.uri: node for node in nodes} + self._literals = {lit.uri: lit for lit in literals} + self._predicates = {pred.uri: pred for pred in predicates} + # verify unique uris + if len(nodes) != len(self._nodes): + raise errors.ConsistencyError('inconsistent nodes') + if len(literals) != len(self._literals): + raise errors.ConsistencyError('inconsistent literals') + if len(predicates) != len(self._predicates): + raise errors.ConsistencyError('inconsistent predicates') + # verify globally unique uris + n_uris = len(set(self._nodes) | set(self._literals) | set(self._predicates)) + if n_uris != len(self._nodes) + len(self._literals) + len(self._predicates): + raise errors.ConsistencyError('URI dual use') + + + ## essentials ## + + def __str__(self) -> str: + return f'{typename(self)}()' + + def __repr__(self) -> str: + return f'{typename(self)}({sorted(self._nodes)}, {sorted(self._literals)}, {sorted(self._predicates)})' + + def __hash__(self) -> int: + return hash(( + type(self), + tuple(sorted(self._nodes.values())), + tuple(sorted(self._literals.values())), + tuple(sorted(self._predicates.values())), + )) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self._nodes == other._nodes \ + and self._literals == other._literals \ + and self._predicates == other._predicates + + + ## operators ## + + SchemaDiff = namedtuple('SchemaDiff', ['nodes', 'literals', 'predicates']) + + def _issubset(self, other: 'Schema') -> bool: + # inconsistent schema can't be ordered. + if not self.consistent_with(other): + return False + # since schemas are consistent, it's sufficient to compare their URIs. + # self's sets are fully contained in other's sets + # pylint: disable=protected-access + return set(self._predicates) <= set(other._predicates) \ + and set(self._nodes) <= set(other._nodes) \ + and set(self._literals) <= set(other._literals) + + def __lt__(self, other: typing.Any) -> bool: + """Return True if *other* is a true subset of *self*.""" + if not isinstance(other, Schema): # other is not a Schema + return NotImplemented + return self != other and self._issubset(other) + + def __le__(self, other: typing.Any) -> bool: + """Return True if *other* is a subset of *self*.""" + if not isinstance(other, Schema): # other is not a Schema + return NotImplemented + return self == other or self._issubset(other) + + def __gt__(self, other: typing.Any) -> bool: + """Return True if *other* is a true superset of *self*.""" + if not isinstance(other, Schema): # other is not a Schema + return NotImplemented + return self != other and other._issubset(self) + + def __ge__(self, other: typing.Any) -> bool: + """Return True if *other* is a superset of *self*.""" + if not isinstance(other, Schema): # other is not a Schema + return NotImplemented + return self == other or other._issubset(self) + + def diff(self, other: 'Schema') -> SchemaDiff: + """Return node, literals, and predicates that are in *self* but not in *other*.""" + return self.SchemaDiff( + nodes=set(self.nodes()) - set(other.nodes()), + literals=set(self.literals()) - set(other.literals()), + predicates=set(self.predicates()) - set(other.predicates()), + ) + + def __sub__(self, other: typing.Any) -> SchemaDiff: + """Alias for `Schema.diff`.""" + if not isinstance(other, Schema): + return NotImplemented + return self.diff(other) + + def consistent_with(self, other: 'Schema') -> bool: + """Checks if two schemas have different predicate, node, or literal definitions for the same uri.""" + # check arg + if not isinstance(other, Schema): + raise TypeError(other) + # node consistency + nodes = set(self.nodes()) | set(other.nodes()) + nuris = {node.uri for node in nodes} + if len(nodes) != len(nuris): + return False + # literal consistency + literals = set(self.literals()) | set(other.literals()) + luris = {lit.uri for lit in literals} + if len(literals) != len(luris): + return False + # predicate consistency + predicates = set(self.predicates()) | set(other.predicates()) + puris = {pred.uri for pred in predicates} + if len(predicates) != len(puris): + return False + # global consistency + if len(puris | luris | nuris) != len(nodes) + len(literals) + len(predicates): + return False + # all checks passed + return True + + @classmethod + def Union( # pylint: disable=invalid-name # capitalized classmethod + cls, + *args: typing.Union['Schema', typing.Iterable['Schema']] + ) -> 'Schema': + """Combine multiple Schema instances into a single one. + As argument, you can either pass multiple Schema instances, or a single + iterable over Schema instances. Any abc.Iterable will be accepted. + + Example: + + >>> a, b, c = Schema.Empty(), Schema.Empty(), Schema.Empty() + >>> # multiple Schema instances + >>> Schema.Union(a, b, c) + >>> # A single iterable over Schema instances + >>> Schema.Union([a, b, c]) + + """ + if len(args) == 0: + raise TypeError('Schema.Union requires at least one argument (Schema or Iterable)') + if isinstance(args[0], cls): # args is sequence of Schema instances + pass + elif len(args) == 1 and isinstance(args[0], abc.Iterable): # args is a single iterable + args = args[0] # type: ignore [assignment] # we checked and thus know that args[0] is an iterable + else: + raise TypeError(f'expected multiple Schema instances or a single Iterable, found {args}') + + nodes, literals, predicates = set(), set(), set() + for schema in args: + # check argument + if not isinstance(schema, cls): + raise TypeError(schema) + # merge with previous schemas + nodes |= set(schema.nodes()) + literals |= set(schema.literals()) + predicates |= set(schema.predicates()) + # return new Schema instance + return cls(predicates, nodes, literals) + + def union(self, other: 'Schema') -> 'Schema': + """Merge *other* and *self* into a new Schema. *self* takes precedence.""" + # check type + if not isinstance(other, type(self)): + raise TypeError(other) + # return combined schemas + return self.Union(self, other) + + def __add__(self, other: typing.Any) -> 'Schema': + """Alias for Schema.union.""" + try: # return merged schemas + return self.union(other) + except TypeError: + return NotImplemented + + def __or__(self, other: typing.Any) -> 'Schema': + """Alias for Schema.union.""" + return self.__add__(other) + + + ## getters ## + # FIXME: nodes, predicates, literals could be properties + # FIXME: interchangeability of URI and _Type?! + + def has_node(self, node: URI) -> bool: + """Return True if a Node with URI *node* is part of the schema.""" + return node in self._nodes + + def has_literal(self, lit: URI) -> bool: + """Return True if a Literal with URI *lit* is part of the schema.""" + return lit in self._literals + + def has_predicate(self, pred: URI) -> bool: + """Return True if a Predicate with URI *pred* is part of the schema.""" + return pred in self._predicates + + def nodes(self) -> typing.Iterable[types.Node]: + """Return an iterator over Node classes.""" + return self._nodes.values() + + def literals(self) -> typing.Iterable[types.Literal]: + """Return an iterator over Literal classes.""" + return self._literals.values() + + def predicates(self) -> typing.Iterable[types.Predicate]: + """Return an iterator over Predicate classes.""" + return self._predicates.values() + + def node(self, uri: URI) -> types.Node: + """Return the Node matching the *uri*.""" + return self._nodes[uri] + + def predicate(self, uri: URI) -> types.Predicate: + """Return the Predicate matching the *uri*.""" + return self._predicates[uri] + + def literal(self, uri: URI) -> types.Literal: + """Return the Literal matching the *uri*.""" + return self._literals[uri] + + + ## constructors ## + + + @classmethod + def Empty(cls) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod + """Return a minimal Schema.""" + node = types.Node(ns.bsfs.Node, None) + literal = types.Literal(ns.bsfs.Literal, None) + predicate = types.Predicate( + uri=ns.bsfs.Predicate, + parent=None, + domain=node, + range=None, + unique=False, + ) + return cls((predicate, ), (node, ), (literal, )) + + + @classmethod + def from_string(cls, schema: str) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod + """Load and return a Schema from a string.""" + # parse string into rdf graph + graph = rdflib.Graph() + graph.parse(data=schema, format='turtle') + + def _fetch_hierarchically(factory, curr): + # emit current node + yield curr + # walk through childs + for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)): + # convert to URI + child = URI(child) + # check circular dependency + if child == curr.uri or child in {node.uri for node in curr.parents()}: + raise errors.ConsistencyError('circular dependency') + # recurse and emit (sub*)childs + yield from _fetch_hierarchically(factory, factory(child, curr)) + + # fetch nodes + nodes = set(_fetch_hierarchically(types.Node, types.Node(ns.bsfs.Node, None))) + nodes_lut = {node.uri: node for node in nodes} + if len(nodes_lut) != len(nodes): + raise errors.ConsistencyError('inconsistent nodes') + + # fetch literals + literals = set(_fetch_hierarchically(types.Literal, types.Literal(ns.bsfs.Literal, None))) + literals_lut = {lit.uri: lit for lit in literals} + if len(literals_lut) != len(literals): + raise errors.ConsistencyError('inconsistent literals') + + # fetch predicates + def build_predicate(uri, parent): + uri = rdflib.URIRef(uri) + # get domain + domains = set(graph.objects(uri, rdflib.RDFS.domain)) + if len(domains) != 1: + raise errors.ConsistencyError(f'inconsistent domain: {domains}') + dom = nodes_lut.get(next(iter(domains))) + if dom is None: + raise errors.ConsistencyError('missing domain') + # get range + ranges = set(graph.objects(uri, rdflib.RDFS.range)) + if len(ranges) != 1: + raise errors.ConsistencyError(f'inconsistent range: {ranges}') + rng = next(iter(ranges)) + rng = nodes_lut.get(rng, literals_lut.get(rng)) + if rng is None: + raise errors.ConsistencyError('missing range') + # get unique flag + uniques = set(graph.objects(uri, rdflib.URIRef(ns.bsfs.unique))) + if len(uniques) != 1: + raise errors.ConsistencyError(f'inconsistent unique flags: {uniques}') + unique = bool(next(iter(uniques))) + # build Predicate + return types.Predicate(URI(uri), parent, dom, rng, unique) + + root_predicate = types.Predicate( + uri=ns.bsfs.Predicate, + parent=None, + domain=nodes_lut[ns.bsfs.Node], + range=None, # FIXME: Unclear how to handle this! Can be either a Literal or a Node + unique=False, + ) + predicates = _fetch_hierarchically(build_predicate, root_predicate) + # return Schema + return cls(predicates, nodes, literals) + +## EOF ## |