aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/schema/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/schema/schema.py')
-rw-r--r--bsfs/schema/schema.py386
1 files changed, 386 insertions, 0 deletions
diff --git a/bsfs/schema/schema.py b/bsfs/schema/schema.py
new file mode 100644
index 0000000..c5d4571
--- /dev/null
+++ b/bsfs/schema/schema.py
@@ -0,0 +1,386 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+from collections import abc, namedtuple
+import typing
+import rdflib
+
+# bsfs imports
+from bsfs.namespace import ns
+from bsfs.utils import errors, URI, typename
+
+# inner-module imports
+from . import types
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Schema',
+ )
+
+
+## code ##
+
+class Schema():
+ """Graph schema.
+
+ Use `Schema.Empty()` to create a new, empty Schema rather than construct
+ it directly.
+
+ The schema is defined by three sets: Predicates, Nodes, and Literals.
+
+ The Schema class guarantees two properties: completeness and consistency.
+ Completeness means that the schema covers all class that are referred to
+ by any other class in the schema. Consistency means that each class is
+ identified by a unique URI and all classes that use that URI consequently
+ use the same definition.
+
+ """
+
+ # node classes.
+ _nodes: typing.Dict[URI, types.Node]
+
+ # literal classes.
+ _literals: typing.Dict[URI, types.Literal]
+
+ # predicate classes.
+ _predicates: typing.Dict[URI, types.Predicate]
+
+ def __init__(
+ self,
+ predicates: typing.Iterable[types.Predicate],
+ nodes: typing.Optional[typing.Iterable[types.Node]] = None,
+ literals: typing.Optional[typing.Iterable[types.Literal]] = None,
+ ):
+ # materialize arguments
+ if nodes is None:
+ nodes = set()
+ if literals is None:
+ literals = set()
+ nodes = set(nodes)
+ literals = set(literals)
+ predicates = set(predicates)
+ # include parents in predicates set
+ # TODO: review type annotations and ignores for python >= 3.11 (parents is _Type but should be typing.Self)
+ predicates |= {par for pred in predicates for par in pred.parents()} # type: ignore [misc]
+ # include predicate domain in nodes set
+ nodes |= {pred.domain for pred in predicates}
+ # include predicate range in nodes and literals sets
+ prange = {pred.range for pred in predicates if pred.range is not None}
+ nodes |= {vert for vert in prange if isinstance(vert, types.Node)}
+ literals |= {vert for vert in prange if isinstance(vert, types.Literal)}
+ # include parents in nodes and literals sets
+ # NOTE: Must be done after predicate domain/range was handled
+ # so that their parents are included as well.
+ nodes |= {par for node in nodes for par in node.parents()} # type: ignore [misc]
+ literals |= {par for lit in literals for par in lit.parents()} # type: ignore [misc]
+ # assign members
+ self._nodes = {node.uri: node for node in nodes}
+ self._literals = {lit.uri: lit for lit in literals}
+ self._predicates = {pred.uri: pred for pred in predicates}
+ # verify unique uris
+ if len(nodes) != len(self._nodes):
+ raise errors.ConsistencyError('inconsistent nodes')
+ if len(literals) != len(self._literals):
+ raise errors.ConsistencyError('inconsistent literals')
+ if len(predicates) != len(self._predicates):
+ raise errors.ConsistencyError('inconsistent predicates')
+ # verify globally unique uris
+ n_uris = len(set(self._nodes) | set(self._literals) | set(self._predicates))
+ if n_uris != len(self._nodes) + len(self._literals) + len(self._predicates):
+ raise errors.ConsistencyError('URI dual use')
+
+
+ ## essentials ##
+
+ def __str__(self) -> str:
+ return f'{typename(self)}()'
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}({sorted(self._nodes)}, {sorted(self._literals)}, {sorted(self._predicates)})'
+
+ def __hash__(self) -> int:
+ return hash((
+ type(self),
+ tuple(sorted(self._nodes.values())),
+ tuple(sorted(self._literals.values())),
+ tuple(sorted(self._predicates.values())),
+ ))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self._nodes == other._nodes \
+ and self._literals == other._literals \
+ and self._predicates == other._predicates
+
+
+ ## operators ##
+
+ SchemaDiff = namedtuple('SchemaDiff', ['nodes', 'literals', 'predicates'])
+
+ def _issubset(self, other: 'Schema') -> bool:
+ # inconsistent schema can't be ordered.
+ if not self.consistent_with(other):
+ return False
+ # since schemas are consistent, it's sufficient to compare their URIs.
+ # self's sets are fully contained in other's sets
+ # pylint: disable=protected-access
+ return set(self._predicates) <= set(other._predicates) \
+ and set(self._nodes) <= set(other._nodes) \
+ and set(self._literals) <= set(other._literals)
+
+ def __lt__(self, other: typing.Any) -> bool:
+ """Return True if *other* is a true subset of *self*."""
+ if not isinstance(other, Schema): # other is not a Schema
+ return NotImplemented
+ return self != other and self._issubset(other)
+
+ def __le__(self, other: typing.Any) -> bool:
+ """Return True if *other* is a subset of *self*."""
+ if not isinstance(other, Schema): # other is not a Schema
+ return NotImplemented
+ return self == other or self._issubset(other)
+
+ def __gt__(self, other: typing.Any) -> bool:
+ """Return True if *other* is a true superset of *self*."""
+ if not isinstance(other, Schema): # other is not a Schema
+ return NotImplemented
+ return self != other and other._issubset(self)
+
+ def __ge__(self, other: typing.Any) -> bool:
+ """Return True if *other* is a superset of *self*."""
+ if not isinstance(other, Schema): # other is not a Schema
+ return NotImplemented
+ return self == other or other._issubset(self)
+
+ def diff(self, other: 'Schema') -> SchemaDiff:
+ """Return node, literals, and predicates that are in *self* but not in *other*."""
+ return self.SchemaDiff(
+ nodes=set(self.nodes()) - set(other.nodes()),
+ literals=set(self.literals()) - set(other.literals()),
+ predicates=set(self.predicates()) - set(other.predicates()),
+ )
+
+ def __sub__(self, other: typing.Any) -> SchemaDiff:
+ """Alias for `Schema.diff`."""
+ if not isinstance(other, Schema):
+ return NotImplemented
+ return self.diff(other)
+
+ def consistent_with(self, other: 'Schema') -> bool:
+ """Checks if two schemas have different predicate, node, or literal definitions for the same uri."""
+ # check arg
+ if not isinstance(other, Schema):
+ raise TypeError(other)
+ # node consistency
+ nodes = set(self.nodes()) | set(other.nodes())
+ nuris = {node.uri for node in nodes}
+ if len(nodes) != len(nuris):
+ return False
+ # literal consistency
+ literals = set(self.literals()) | set(other.literals())
+ luris = {lit.uri for lit in literals}
+ if len(literals) != len(luris):
+ return False
+ # predicate consistency
+ predicates = set(self.predicates()) | set(other.predicates())
+ puris = {pred.uri for pred in predicates}
+ if len(predicates) != len(puris):
+ return False
+ # global consistency
+ if len(puris | luris | nuris) != len(nodes) + len(literals) + len(predicates):
+ return False
+ # all checks passed
+ return True
+
+ @classmethod
+ def Union( # pylint: disable=invalid-name # capitalized classmethod
+ cls,
+ *args: typing.Union['Schema', typing.Iterable['Schema']]
+ ) -> 'Schema':
+ """Combine multiple Schema instances into a single one.
+ As argument, you can either pass multiple Schema instances, or a single
+ iterable over Schema instances. Any abc.Iterable will be accepted.
+
+ Example:
+
+ >>> a, b, c = Schema.Empty(), Schema.Empty(), Schema.Empty()
+ >>> # multiple Schema instances
+ >>> Schema.Union(a, b, c)
+ >>> # A single iterable over Schema instances
+ >>> Schema.Union([a, b, c])
+
+ """
+ if len(args) == 0:
+ raise TypeError('Schema.Union requires at least one argument (Schema or Iterable)')
+ if isinstance(args[0], cls): # args is sequence of Schema instances
+ pass
+ elif len(args) == 1 and isinstance(args[0], abc.Iterable): # args is a single iterable
+ args = args[0] # type: ignore [assignment] # we checked and thus know that args[0] is an iterable
+ else:
+ raise TypeError(f'expected multiple Schema instances or a single Iterable, found {args}')
+
+ nodes, literals, predicates = set(), set(), set()
+ for schema in args:
+ # check argument
+ if not isinstance(schema, cls):
+ raise TypeError(schema)
+ # merge with previous schemas
+ nodes |= set(schema.nodes())
+ literals |= set(schema.literals())
+ predicates |= set(schema.predicates())
+ # return new Schema instance
+ return cls(predicates, nodes, literals)
+
+ def union(self, other: 'Schema') -> 'Schema':
+ """Merge *other* and *self* into a new Schema. *self* takes precedence."""
+ # check type
+ if not isinstance(other, type(self)):
+ raise TypeError(other)
+ # return combined schemas
+ return self.Union(self, other)
+
+ def __add__(self, other: typing.Any) -> 'Schema':
+ """Alias for Schema.union."""
+ try: # return merged schemas
+ return self.union(other)
+ except TypeError:
+ return NotImplemented
+
+ def __or__(self, other: typing.Any) -> 'Schema':
+ """Alias for Schema.union."""
+ return self.__add__(other)
+
+
+ ## getters ##
+ # FIXME: nodes, predicates, literals could be properties
+ # FIXME: interchangeability of URI and _Type?!
+
+ def has_node(self, node: URI) -> bool:
+ """Return True if a Node with URI *node* is part of the schema."""
+ return node in self._nodes
+
+ def has_literal(self, lit: URI) -> bool:
+ """Return True if a Literal with URI *lit* is part of the schema."""
+ return lit in self._literals
+
+ def has_predicate(self, pred: URI) -> bool:
+ """Return True if a Predicate with URI *pred* is part of the schema."""
+ return pred in self._predicates
+
+ def nodes(self) -> typing.Iterable[types.Node]:
+ """Return an iterator over Node classes."""
+ return self._nodes.values()
+
+ def literals(self) -> typing.Iterable[types.Literal]:
+ """Return an iterator over Literal classes."""
+ return self._literals.values()
+
+ def predicates(self) -> typing.Iterable[types.Predicate]:
+ """Return an iterator over Predicate classes."""
+ return self._predicates.values()
+
+ def node(self, uri: URI) -> types.Node:
+ """Return the Node matching the *uri*."""
+ return self._nodes[uri]
+
+ def predicate(self, uri: URI) -> types.Predicate:
+ """Return the Predicate matching the *uri*."""
+ return self._predicates[uri]
+
+ def literal(self, uri: URI) -> types.Literal:
+ """Return the Literal matching the *uri*."""
+ return self._literals[uri]
+
+
+ ## constructors ##
+
+
+ @classmethod
+ def Empty(cls) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod
+ """Return a minimal Schema."""
+ node = types.Node(ns.bsfs.Node, None)
+ literal = types.Literal(ns.bsfs.Literal, None)
+ predicate = types.Predicate(
+ uri=ns.bsfs.Predicate,
+ parent=None,
+ domain=node,
+ range=None,
+ unique=False,
+ )
+ return cls((predicate, ), (node, ), (literal, ))
+
+
+ @classmethod
+ def from_string(cls, schema: str) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod
+ """Load and return a Schema from a string."""
+ # parse string into rdf graph
+ graph = rdflib.Graph()
+ graph.parse(data=schema, format='turtle')
+
+ def _fetch_hierarchically(factory, curr):
+ # emit current node
+ yield curr
+ # walk through childs
+ for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
+ # convert to URI
+ child = URI(child)
+ # check circular dependency
+ if child == curr.uri or child in {node.uri for node in curr.parents()}:
+ raise errors.ConsistencyError('circular dependency')
+ # recurse and emit (sub*)childs
+ yield from _fetch_hierarchically(factory, factory(child, curr))
+
+ # fetch nodes
+ nodes = set(_fetch_hierarchically(types.Node, types.Node(ns.bsfs.Node, None)))
+ nodes_lut = {node.uri: node for node in nodes}
+ if len(nodes_lut) != len(nodes):
+ raise errors.ConsistencyError('inconsistent nodes')
+
+ # fetch literals
+ literals = set(_fetch_hierarchically(types.Literal, types.Literal(ns.bsfs.Literal, None)))
+ literals_lut = {lit.uri: lit for lit in literals}
+ if len(literals_lut) != len(literals):
+ raise errors.ConsistencyError('inconsistent literals')
+
+ # fetch predicates
+ def build_predicate(uri, parent):
+ uri = rdflib.URIRef(uri)
+ # get domain
+ domains = set(graph.objects(uri, rdflib.RDFS.domain))
+ if len(domains) != 1:
+ raise errors.ConsistencyError(f'inconsistent domain: {domains}')
+ dom = nodes_lut.get(next(iter(domains)))
+ if dom is None:
+ raise errors.ConsistencyError('missing domain')
+ # get range
+ ranges = set(graph.objects(uri, rdflib.RDFS.range))
+ if len(ranges) != 1:
+ raise errors.ConsistencyError(f'inconsistent range: {ranges}')
+ rng = next(iter(ranges))
+ rng = nodes_lut.get(rng, literals_lut.get(rng))
+ if rng is None:
+ raise errors.ConsistencyError('missing range')
+ # get unique flag
+ uniques = set(graph.objects(uri, rdflib.URIRef(ns.bsfs.unique)))
+ if len(uniques) != 1:
+ raise errors.ConsistencyError(f'inconsistent unique flags: {uniques}')
+ unique = bool(next(iter(uniques)))
+ # build Predicate
+ return types.Predicate(URI(uri), parent, dom, rng, unique)
+
+ root_predicate = types.Predicate(
+ uri=ns.bsfs.Predicate,
+ parent=None,
+ domain=nodes_lut[ns.bsfs.Node],
+ range=None, # FIXME: Unclear how to handle this! Can be either a Literal or a Node
+ unique=False,
+ )
+ predicates = _fetch_hierarchically(build_predicate, root_predicate)
+ # return Schema
+ return cls(predicates, nodes, literals)
+
+## EOF ##