From e8492489098ef5f8566214e083cd2c2d1d449f5a Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Thu, 8 Dec 2022 16:36:19 +0100 Subject: sparql triple store and graph (nodes, mostly) --- bsfs/triple_store/__init__.py | 20 ++++ bsfs/triple_store/base.py | 128 +++++++++++++++++++++ bsfs/triple_store/sparql.py | 253 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 401 insertions(+) create mode 100644 bsfs/triple_store/__init__.py create mode 100644 bsfs/triple_store/base.py create mode 100644 bsfs/triple_store/sparql.py (limited to 'bsfs/triple_store') diff --git a/bsfs/triple_store/__init__.py b/bsfs/triple_store/__init__.py new file mode 100644 index 0000000..fb5a8a9 --- /dev/null +++ b/bsfs/triple_store/__init__.py @@ -0,0 +1,20 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# inner-module imports +from .base import TripleStoreBase +from .sparql import SparqlStore + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + 'TripleStoreBase', + ) + +## EOF ## diff --git a/bsfs/triple_store/base.py b/bsfs/triple_store/base.py new file mode 100644 index 0000000..a2668c3 --- /dev/null +++ b/bsfs/triple_store/base.py @@ -0,0 +1,128 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import abc +import typing + +# inner-module imports +from bsfs.utils import URI, typename +import bsfs.schema as _schema + +# exports +__all__: typing.Sequence[str] = ( + 'TripleStoreBase', + ) + + +## code ## + +class TripleStoreBase(abc.ABC): + """ + """ + + # storage's URI. None implies a temporary location. + uri: typing.Optional[URI] = None + + def __init__(self, uri: typing.Optional[URI] = None): + self.uri = uri + + def __hash__(self) -> int: + uri = self.uri if self.uri is not None else id(self) + return hash((type(self), uri)) + + def __eq__(self, other) -> bool: + return isinstance(other, type(self)) \ + and (( self.uri is not None \ + and other.uri is not None \ + and self.uri == other.uri ) \ + or id(self) == id(other)) + + def __repr__(self) -> str: + return f'{typename(self)}(uri={self.uri})' + + def __str__(self) -> str: + return f'{typename(self)}(uri={self.uri})' + + def is_persistent(self) -> bool: + """Return True if data is stored persistently.""" + return self.uri is not None + + + @classmethod + @abc.abstractmethod + def Open( + cls, + uri: str, + **kwargs: typing.Any, + ) -> 'TripleStoreBase': + """Return a TripleStoreBase instance connected to *uri*.""" + + @abc.abstractmethod + def commit(self): + """Commit the current transaction.""" + + @abc.abstractmethod + def rollback(self): + """Undo changes since the last commit.""" + + @property + @abc.abstractmethod + def schema(self) -> _schema.Schema: + """Return the store's local schema.""" + + @schema.setter + def schema(self, schema: _schema.Schema): + """Migrate to new schema by adding or removing class definitions. + + Commits before and after the migration. + + Instances of removed classes will be deleted irreversably. + Note that modifying an existing class is not directly supported. + Also, it is generally discouraged, since changing definitions may + lead to inconsistencies across multiple clients in a distributed + setting. Instead, consider introducing a new class under its own + uri. Such a migration would look as follows: + + 1. Add new class definitions. + 2. Create instances of the new classes and copy relevant data. + 3. Remove the old definitions. + + To modify a class, i.e., re-use a previous uri with a new + class definition, you would have to migrate via temporary + class definitions, and thus repeat the above procedure two times. + + """ + + @abc.abstractmethod + def exists( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """ + """ + + @abc.abstractmethod + def create( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """Create *guid* nodes with type *subject*.""" + + @abc.abstractmethod + def set( + self, + node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + guids: typing.Iterable[URI], + predicate: _schema.Predicate, + values: typing.Iterable[typing.Any], + ): + """ + """ + +## EOF ## diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py new file mode 100644 index 0000000..3eab869 --- /dev/null +++ b/bsfs/triple_store/sparql.py @@ -0,0 +1,253 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import itertools +import typing +import rdflib + +# bsfs imports +from bsfs.utils import URI +from bsfs.utils import errors +import bsfs.schema as _schema + +# inner-module imports +from . import base + + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + ) + + +## code ## + +class Transaction(): + """Lightweight rdflib transactions for in-memory databases.""" + + def __init__(self, graph): + self._graph = graph + self.commit() # initialize + + def commit(self): + self._added = [] + self._removed = [] + + def rollback(self): + for triple in self._added: + self._graph.remove(triple) + for triple in self._removed: + self._graph.add(triple) + + def add(self, triple): + if triple not in self._graph: + self._added.append(triple) + self._graph.add(triple) + + def remove(self, triple): + if triple in self._graph: + self._removed.append(triple) + self._graph.remove(triple) + + +class SparqlStore(base.TripleStoreBase): + """ + """ + + def __init__(self, uri: typing.Optional[URI] = None): + super().__init__(uri) + self.graph = rdflib.Graph() + self.transaction = Transaction(self.graph) + self.__schema = _schema.Schema.Empty() + + @classmethod + def Open( + cls, + uri: str, + **kwargs: typing.Any, + ) -> 'SparqlStore': + return cls(None) + + def commit(self): + self.transaction.commit() + + def rollback(self): + self.transaction.rollback() + + @property + def schema(self) -> _schema.Schema: + """Return the current schema.""" + return self.__schema + + @schema.setter + def schema(self, schema: _schema.Schema): + """Migrate to new schema by adding or removing class definitions. + + Commits before and after the migration. + + Instances of removed classes will be deleted irreversably. + Note that modifying an existing class is not directly supported. + Also, it is generally discouraged, since changing definitions may + lead to inconsistencies across multiple clients in a distributed + setting. Instead, consider introducing a new class under its own + uri. Such a migration would look as follows: + + 1. Add new class definitions. + 2. Create instances of the new classes and copy relevant data. + 3. Remove the old definitions. + + To modify a class, i.e., re-use a previous uri with a new + class definition, you would have to migrate via temporary + class definitions, and thus repeat the above procedure two times. + + """ + # check args: Schema instanace + if not isinstance(schema, _schema.Schema): + raise TypeError(schema) + # check compatibility: No contradicting definitions + if not self.schema.consistent_with(schema): + raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') + + # commit the current transaction + self.commit() + + # adjust instances: + # nothing to do for added classes + # delete instances of removed classes + + # get deleted classes + sub = self.schema - schema + + # remove predicate instances + for pred in sub.predicates: + for src, trg in self.graph.subject_objects(rdflib.URIRef(pred.uri)): + self.transaction.remove((src, rdflib.URIRef(pred.uri), trg)) + + # remove node instances + for node in sub.nodes: + # iterate through node instances + for inst in self.graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): + # remove triples where the instance is in the object position + for src, pred in self.graph.subject_predicates(inst): + self.transaction.remove((src, pred, inst)) + # remove triples where the instance is in the subject position + for pred, trg in self.graph.predicate_objects(inst): + self.transaction.remove((inst, pred, trg)) + # remove instance + self.transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) + + # NOTE: Nothing to do for literals + + # commit instance changes + self.commit() + + # migrate schema + self.__schema = schema + + + def _has_type(self, subject: URI, node_type: _schema.Node) -> bool: + """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + + subject_types = list(self.graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) + if len(subject_types) == 0: + return False + elif len(subject_types) == 1: + node = self.schema.node(URI(subject_types[0])) + if node == node_type: + return True + elif node_type in node.parents(): + return True + else: + return False + else: + raise errors.UnreachableError() + + def exists( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """ + """ + return {subj for subj in guids if self._has_type(subj, node_type)} + + def create( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """ + """ + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check and create guids + for guid in guids: + guid = rdflib.URIRef(guid) + # check node existence + if (guid, rdflib.RDF.type, None) in self.graph: + # FIXME: node exists and may have a different type! ignore? raise? report? + continue + # add node + self.transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) + + def set( + self, + node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + guids: typing.Iterable[URI], + predicate: _schema.Predicate, + values: typing.Iterable[typing.Any], + ): + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check predicate + if predicate not in self.schema.predicates(): + raise errors.ConsistencyError(f'{predicate} is not defined in the schema') + if not node_type <= predicate.domain: + raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') + # NOTE: predicate.range is in the schema since predicate is in the schema. + # check values + if len(values) == 0: + return + if predicate.unique and len(values) != 1: + raise ValueError(values) + if isinstance(predicate.range, _schema.Node): + values = set(values) # materialize to safeguard against iterators passed as argument + inconsistent = {val for val in values if not self._has_type(val, predicate.range)} + # catches nodes that don't exist and nodes that have an inconsistent type + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + # check guids + # FIXME: Fail or skip inexistent nodes? + guids = set(guids) + inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + + # add triples + pred = rdflib.URIRef(predicate.uri) + for guid, value in itertools.product(guids, values): + guid = rdflib.URIRef(guid) + # convert value + if isinstance(predicate.range, _schema.Literal): + value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) + elif isinstance(predicate.range, _schema.Node): + value = rdflib.URIRef(value) + else: + raise errors.UnreachableError() + # clear triples for unique predicates + if predicate.unique: + for obj in self.graph.objects(guid, pred): + if obj != value: + self.transaction.remove((guid, pred, obj)) + # add triple + self.transaction.add((guid, pred, value)) + +## EOF ## -- cgit v1.2.3