""" Part of the BlackStar filesystem (bsfs) module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import itertools import typing import rdflib # bsfs imports from bsfs import schema as bsc from bsfs.utils import errors, URI # inner-module imports from . import base # exports __all__: typing.Sequence[str] = ( 'SparqlStore', ) ## code ## class _Transaction(): """Lightweight rdflib transactions for in-memory databases.""" def __init__(self, graph): self._graph = graph self.commit() # initialize def commit(self): self._added = [] self._removed = [] def rollback(self): for triple in self._added: self._graph.remove(triple) for triple in self._removed: self._graph.add(triple) def add(self, triple): if triple not in self._graph: self._added.append(triple) self._graph.add(triple) def remove(self, triple): if triple in self._graph: self._removed.append(triple) self._graph.remove(triple) class SparqlStore(base.TripleStoreBase): """ """ # The rdflib graph. _graph: rdflib.Graph # Current transaction. _transaction: _Transaction # The local schema. _schema: bsc.Schema def __init__(self): super().__init__(None) self._graph = rdflib.Graph() self._transaction = _Transaction(self._graph) self._schema = bsc.Schema.Empty() # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) # However, not having it here is clearer since it's explicit that there are no arguments. @classmethod def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ return cls() def commit(self): self._transaction.commit() def rollback(self): self._transaction.rollback() @property def schema(self) -> bsc.Schema: return self._schema @schema.setter def schema(self, schema: _schema.Schema): """Migrate to new schema by adding or removing class definitions. Commits before and after the migration. Instances of removed classes will be deleted irreversably. Note that modifying an existing class is not directly supported. Also, it is generally discouraged, since changing definitions may lead to inconsistencies across multiple clients in a distributed setting. Instead, consider introducing a new class under its own uri. Such a migration would look as follows: 1. Add new class definitions. 2. Create instances of the new classes and copy relevant data. 3. Remove the old definitions. To modify a class, i.e., re-use a previous uri with a new class definition, you would have to migrate via temporary class definitions, and thus repeat the above procedure two times. """ # check args: Schema instanace if not isinstance(schema, bsc.Schema): raise TypeError(schema) # check compatibility: No contradicting definitions if not self.schema.consistent_with(schema): raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') # commit the current transaction self.commit() # adjust instances: # nothing to do for added classes # delete instances of removed classes # get deleted classes sub = self.schema - schema # remove predicate instances for pred in sub.predicates: for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) # remove node instances for node in sub.nodes: # iterate through node instances for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): # remove triples where the instance is in the object position for src, pred in self._graph.subject_predicates(inst): self._transaction.remove((src, pred, inst)) # remove triples where the instance is in the subject position for pred, trg in self._graph.predicate_objects(inst): self._transaction.remove((inst, pred, trg)) # remove instance self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) # NOTE: Nothing to do for literals # commit instance changes self.commit() # migrate schema self._schema = schema def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) if len(subject_types) == 0: return False elif len(subject_types) == 1: node = self.schema.node(URI(subject_types[0])) if node == node_type: return True elif node_type in node.parents(): return True else: return False else: raise errors.UnreachableError() def exists( self, node_type: bsc.Node, guids: typing.Iterable[URI], ): """ """ return {subj for subj in guids if self._has_type(subj, node_type)} def create( self, node_type: bsc.Node, guids: typing.Iterable[URI], ): """ """ # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check and create guids for guid in guids: guid = rdflib.URIRef(guid) # check node existence if (guid, rdflib.RDF.type, None) in self.graph: # FIXME: node exists and may have a different type! ignore? raise? report? continue # add node self._transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) def set( self, node_type: bsc.Node, guids: typing.Iterable[URI], predicate: bsc.Predicate, values: typing.Iterable[typing.Any], ): # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check predicate if predicate not in self.schema.predicates(): raise errors.ConsistencyError(f'{predicate} is not defined in the schema') if not node_type <= predicate.domain: raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') # NOTE: predicate.range is in the schema since predicate is in the schema. # check values if len(values) == 0: return if predicate.unique and len(values) != 1: raise ValueError(values) if isinstance(predicate.range, bsc.Node): values = set(values) # materialize to safeguard against iterators passed as argument inconsistent = {val for val in values if not self._has_type(val, predicate.range)} # catches nodes that don't exist and nodes that have an inconsistent type if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) # check guids # FIXME: Fail or skip inexistent nodes? guids = set(guids) inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) # add triples pred = rdflib.URIRef(predicate.uri) for guid, value in itertools.product(guids, values): guid = rdflib.URIRef(guid) # convert value if isinstance(predicate.range, bsc.Literal): value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) elif isinstance(predicate.range, bsc.Node): value = rdflib.URIRef(value) else: raise errors.UnreachableError() # clear triples for unique predicates if predicate.unique: for obj in self._graph.objects(guid, pred): if obj != value: self._transaction.remove((guid, pred, obj)) # add triple self._transaction.add((guid, pred, value)) ## EOF ##