""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import itertools import typing import rdflib # bsfs imports from bsfs import schema as bsc from bsfs.utils import errors, URI # inner-module imports from . import base # exports __all__: typing.Sequence[str] = ( 'SparqlStore', ) ## code ## class _Transaction(): """Lightweight rdflib transactions for in-memory databases.""" # graph instance. _graph: rdflib.Graph # current log of added triples. _added: typing.List[typing.Any] # current log of removed triples. _removed: typing.List[typing.Any] def __init__(self, graph: rdflib.Graph): self._graph = graph # initialize internal structures self.commit() def commit(self): """Commit temporary changes.""" self._added = [] self._removed = [] def rollback(self): """Undo changes since the last commit.""" for triple in self._added: self._graph.remove(triple) for triple in self._removed: self._graph.add(triple) def add(self, triple: typing.Any): """Add a triple to the graph.""" if triple not in self._graph: self._added.append(triple) self._graph.add(triple) def remove(self, triple: typing.Any): """Remove a triple from the graph.""" if triple in self._graph: self._removed.append(triple) self._graph.remove(triple) class SparqlStore(base.TripleStoreBase): """Sparql-based triple store. The sparql triple store uses a third-party backend (currently rdflib) to store triples and manages them via the Sparql query language. """ # The rdflib graph. _graph: rdflib.Graph # Current transaction. _transaction: _Transaction # The local schema. _schema: bsc.Schema def __init__(self): super().__init__(None) self._graph = rdflib.Graph() self._transaction = _Transaction(self._graph) self._schema = bsc.Schema.Empty() # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) # However, not having it here is clearer since it's explicit that there are no arguments. @classmethod def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ return cls() def commit(self): self._transaction.commit() def rollback(self): self._transaction.rollback() @property def schema(self) -> bsc.Schema: return self._schema @schema.setter def schema(self, schema: bsc.Schema): # check args: Schema instanace if not isinstance(schema, bsc.Schema): raise TypeError(schema) # check compatibility: No contradicting definitions if not self.schema.consistent_with(schema): raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') # commit the current transaction self.commit() # adjust instances: # nothing to do for added classes # delete instances of removed classes # get deleted classes sub = self.schema - schema # remove predicate instances for pred in sub.predicates: for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) # remove node instances for node in sub.nodes: # iterate through node instances for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): # remove triples where the instance is in the object position for src, pred in self._graph.subject_predicates(inst): self._transaction.remove((src, pred, inst)) # remove triples where the instance is in the subject position for pred, trg in self._graph.predicate_objects(inst): self._transaction.remove((inst, pred, trg)) # remove instance self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) # NOTE: Nothing to do for literals # commit instance changes self.commit() # migrate schema self._schema = schema def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) if len(subject_types) == 0: return False if len(subject_types) == 1: node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str if node == node_type: return True if node_type in node.parents(): return True return False raise errors.UnreachableError() def exists( self, node_type: bsc.Node, guids: typing.Iterable[URI], ) -> typing.Iterable[URI]: return (subj for subj in guids if self._has_type(subj, node_type)) def create( self, node_type: bsc.Node, guids: typing.Iterable[URI], ): # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check and create guids for guid in guids: subject = rdflib.URIRef(guid) # check node existence if (subject, rdflib.RDF.type, None) in self._graph: # FIXME: node exists and may have a different type! ignore? raise? report? continue # add node self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) def set( self, node_type: bsc.Node, guids: typing.Iterable[URI], predicate: bsc.Predicate, values: typing.Iterable[typing.Any], ): # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check predicate if predicate not in self.schema.predicates(): raise errors.ConsistencyError(f'{predicate} is not defined in the schema') if not node_type <= predicate.domain: raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') # NOTE: predicate.range is in the schema since predicate is in the schema. # materialize values values = set(values) # check values if len(values) == 0: return if predicate.unique and len(values) != 1: raise ValueError(values) if isinstance(predicate.range, bsc.Node): values = set(values) # materialize to safeguard against iterators passed as argument inconsistent = {val for val in values if not self._has_type(val, predicate.range)} # catches nodes that don't exist and nodes that have an inconsistent type if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) # check guids # FIXME: Fail or skip inexistent nodes? guids = set(guids) inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) # add triples pred = rdflib.URIRef(predicate.uri) for guid, value in itertools.product(guids, values): guid = rdflib.URIRef(guid) # convert value if isinstance(predicate.range, bsc.Literal): value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) elif isinstance(predicate.range, bsc.Node): value = rdflib.URIRef(value) else: raise errors.UnreachableError() # clear triples for unique predicates if predicate.unique: for obj in self._graph.objects(guid, pred): if obj != value: self._transaction.remove((guid, pred, obj)) # add triple self._transaction.add((guid, pred, value)) ## EOF ##