diff options
Diffstat (limited to 'bsfs/triple_store')
-rw-r--r-- | bsfs/triple_store/__init__.py | 20 | ||||
-rw-r--r-- | bsfs/triple_store/base.py | 148 | ||||
-rw-r--r-- | bsfs/triple_store/sparql.py | 253 |
3 files changed, 421 insertions, 0 deletions
diff --git a/bsfs/triple_store/__init__.py b/bsfs/triple_store/__init__.py new file mode 100644 index 0000000..fb5a8a9 --- /dev/null +++ b/bsfs/triple_store/__init__.py @@ -0,0 +1,20 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# inner-module imports +from .base import TripleStoreBase +from .sparql import SparqlStore + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + 'TripleStoreBase', + ) + +## EOF ## diff --git a/bsfs/triple_store/base.py b/bsfs/triple_store/base.py new file mode 100644 index 0000000..6561262 --- /dev/null +++ b/bsfs/triple_store/base.py @@ -0,0 +1,148 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import abc +import typing + +# inner-module imports +from bsfs.utils import URI, typename +import bsfs.schema as _schema + +# exports +__all__: typing.Sequence[str] = ( + 'TripleStoreBase', + ) + + +## code ## + +class TripleStoreBase(abc.ABC): + """TripleStore base class. + + Use the `Open` method to create a new instance and to initialize + the required structures. + + Triple stores express a graph via its (subject, predicate, object) triples. + They provides methods to add and remove triples, and to query the storage + for given graph structures. The subject is always a node in the graph, + whereas nodes are identifiable by a unique URI. Note that blank nodes + (without an explicit URI) are not supported. The object can be another + Node or a Literal value. The relation between a subject and an object + is expressed via a Predicate. The graph structures are governed by a + schema that defines which Node, Literal, and Predicate classes exist + and how they can interact (see `bsfs.schema.Schema`). + + """ + + # storage's URI. None implies a temporary location. + uri: typing.Optional[URI] = None + + def __init__(self, uri: typing.Optional[URI] = None): + self.uri = uri + + def __hash__(self) -> int: + uri = self.uri if self.uri is not None else id(self) + return hash((type(self), uri)) + + def __eq__(self, other) -> bool: + return isinstance(other, type(self)) \ + and (( self.uri is not None \ + and other.uri is not None \ + and self.uri == other.uri ) \ + or id(self) == id(other)) + + def __repr__(self) -> str: + return f'{typename(self)}(uri={self.uri})' + + def __str__(self) -> str: + return f'{typename(self)}(uri={self.uri})' + + def is_persistent(self) -> bool: + """Return True if data is stored persistently.""" + return self.uri is not None + + + @classmethod + @abc.abstractmethod + def Open(cls, **kwargs: typing.Any) -> 'TripleStoreBase': # pylint: disable=invalid-name # capitalized classmethod + """Return a TripleStoreBase instance connected to *uri*.""" + + @abc.abstractmethod + def commit(self): + """Commit the current transaction.""" + + @abc.abstractmethod + def rollback(self): + """Undo changes since the last commit.""" + + @property + @abc.abstractmethod + def schema(self) -> _schema.Schema: + """Return the store's local schema.""" + + @schema.setter + @abc.abstractmethod + def schema(self, schema: _schema.Schema): + """Migrate to new schema by adding or removing class definitions. + + Commits before and after the migration. + + Instances of removed classes will be deleted irreversably. + Note that modifying an existing class is not directly supported. + Also, it is generally discouraged, since changing definitions may + lead to inconsistencies across multiple clients in a distributed + setting. Instead, consider introducing a new class under its own + uri. Such a migration would look as follows: + + 1. Add new class definitions. + 2. Create instances of the new classes and copy relevant data. + 3. Remove the old definitions. + + To modify a class, i.e., re-use a previous uri with a new + class definition, you would have to migrate via temporary + class definitions, and thus repeat the above procedure two times. + + """ + + @abc.abstractmethod + def exists( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ) -> typing.Iterable[URI]: + """Return those *guids* that exist and have type *node_type* or a subclass thereof.""" + + @abc.abstractmethod + def create( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """Create *guid* nodes with type *subject*.""" + + @abc.abstractmethod + def set( + self, + node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + guids: typing.Iterable[URI], + predicate: _schema.Predicate, + values: typing.Iterable[typing.Any], + ): + """Add triples to the graph. + + It is assumed that all of *guids* exist and have *node_type*. + This method adds a triple (guid, predicate, value) for every guid in + *guids* and each value in *values* (cartesian product). Note that + *values* must have length one for unique predicates, and that + currently existing values will be overwritten in this case. + It also verifies that all symbols are part of the schema and that + the *predicate* matches the *node_type*. + Raises `bsfs.errors.ConsistencyError` if these assumptions are violated. + + """ + +## EOF ## diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py new file mode 100644 index 0000000..7516dff --- /dev/null +++ b/bsfs/triple_store/sparql.py @@ -0,0 +1,253 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import itertools +import typing +import rdflib + +# bsfs imports +from bsfs import schema as bsc +from bsfs.utils import errors, URI + +# inner-module imports +from . import base + + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + ) + + +## code ## + +class _Transaction(): + """Lightweight rdflib transactions for in-memory databases.""" + + # graph instance. + _graph: rdflib.Graph + + # current log of added triples. + _added: typing.List[typing.Any] + + # current log of removed triples. + _removed: typing.List[typing.Any] + + def __init__(self, graph: rdflib.Graph): + self._graph = graph + # initialize internal structures + self.commit() + + def commit(self): + """Commit temporary changes.""" + self._added = [] + self._removed = [] + + def rollback(self): + """Undo changes since the last commit.""" + for triple in self._added: + self._graph.remove(triple) + for triple in self._removed: + self._graph.add(triple) + + def add(self, triple: typing.Any): + """Add a triple to the graph.""" + if triple not in self._graph: + self._added.append(triple) + self._graph.add(triple) + + def remove(self, triple: typing.Any): + """Remove a triple from the graph.""" + if triple in self._graph: + self._removed.append(triple) + self._graph.remove(triple) + + +class SparqlStore(base.TripleStoreBase): + """Sparql-based triple store. + + The sparql triple store uses a third-party backend + (currently rdflib) to store triples and manages them via + the Sparql query language. + + """ + + # The rdflib graph. + _graph: rdflib.Graph + + # Current transaction. + _transaction: _Transaction + + # The local schema. + _schema: bsc.Schema + + def __init__(self): + super().__init__(None) + self._graph = rdflib.Graph() + self._transaction = _Transaction(self._graph) + self._schema = bsc.Schema.Empty() + + # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) + # However, not having it here is clearer since it's explicit that there are no arguments. + @classmethod + def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ + return cls() + + def commit(self): + self._transaction.commit() + + def rollback(self): + self._transaction.rollback() + + @property + def schema(self) -> bsc.Schema: + return self._schema + + @schema.setter + def schema(self, schema: bsc.Schema): + # check args: Schema instanace + if not isinstance(schema, bsc.Schema): + raise TypeError(schema) + # check compatibility: No contradicting definitions + if not self.schema.consistent_with(schema): + raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') + + # commit the current transaction + self.commit() + + # adjust instances: + # nothing to do for added classes + # delete instances of removed classes + + # get deleted classes + sub = self.schema - schema + + # remove predicate instances + for pred in sub.predicates: + for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): + self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) + + # remove node instances + for node in sub.nodes: + # iterate through node instances + for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): + # remove triples where the instance is in the object position + for src, pred in self._graph.subject_predicates(inst): + self._transaction.remove((src, pred, inst)) + # remove triples where the instance is in the subject position + for pred, trg in self._graph.predicate_objects(inst): + self._transaction.remove((inst, pred, trg)) + # remove instance + self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) + + # NOTE: Nothing to do for literals + + # commit instance changes + self.commit() + + # migrate schema + self._schema = schema + + + def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: + """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + + subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) + if len(subject_types) == 0: + return False + if len(subject_types) == 1: + node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str + if node == node_type: + return True + if node_type in node.parents(): + return True + return False + raise errors.UnreachableError() + + def exists( + self, + node_type: bsc.Node, + guids: typing.Iterable[URI], + ) -> typing.Iterable[URI]: + return (subj for subj in guids if self._has_type(subj, node_type)) + + def create( + self, + node_type: bsc.Node, + guids: typing.Iterable[URI], + ): + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check and create guids + for guid in guids: + subject = rdflib.URIRef(guid) + # check node existence + if (subject, rdflib.RDF.type, None) in self._graph: + # FIXME: node exists and may have a different type! ignore? raise? report? + continue + # add node + self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) + + def set( + self, + node_type: bsc.Node, + guids: typing.Iterable[URI], + predicate: bsc.Predicate, + values: typing.Iterable[typing.Any], + ): + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check predicate + if predicate not in self.schema.predicates(): + raise errors.ConsistencyError(f'{predicate} is not defined in the schema') + if not node_type <= predicate.domain: + raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') + # NOTE: predicate.range is in the schema since predicate is in the schema. + # materialize values + values = set(values) + # check values + if len(values) == 0: + return + if predicate.unique and len(values) != 1: + raise ValueError(values) + if isinstance(predicate.range, bsc.Node): + values = set(values) # materialize to safeguard against iterators passed as argument + inconsistent = {val for val in values if not self._has_type(val, predicate.range)} + # catches nodes that don't exist and nodes that have an inconsistent type + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + # check guids + # FIXME: Fail or skip inexistent nodes? + guids = set(guids) + inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + + # add triples + pred = rdflib.URIRef(predicate.uri) + for guid, value in itertools.product(guids, values): + guid = rdflib.URIRef(guid) + # convert value + if isinstance(predicate.range, bsc.Literal): + value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) + elif isinstance(predicate.range, bsc.Node): + value = rdflib.URIRef(value) + else: + raise errors.UnreachableError() + # clear triples for unique predicates + if predicate.unique: + for obj in self._graph.objects(guid, pred): + if obj != value: + self._transaction.remove((guid, pred, obj)) + # add triple + self._transaction.add((guid, pred, value)) + +## EOF ## |