aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/triple_store/sparql/sparql.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/triple_store/sparql/sparql.py')
-rw-r--r--bsfs/triple_store/sparql/sparql.py297
1 files changed, 297 insertions, 0 deletions
diff --git a/bsfs/triple_store/sparql/sparql.py b/bsfs/triple_store/sparql/sparql.py
new file mode 100644
index 0000000..c3cbff6
--- /dev/null
+++ b/bsfs/triple_store/sparql/sparql.py
@@ -0,0 +1,297 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import itertools
+import typing
+import rdflib
+
+# bsfs imports
+from bsfs import schema as bsc
+from bsfs.query import ast
+from bsfs.utils import errors, URI
+
+# inner-module imports
+from . import parse_filter
+from .. import base
+
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'SparqlStore',
+ )
+
+
+## code ##
+
+class _Transaction():
+ """Lightweight rdflib transactions for in-memory databases."""
+
+ # graph instance.
+ _graph: rdflib.Graph
+
+ # current log of added triples.
+ _added: typing.List[typing.Any]
+
+ # current log of removed triples.
+ _removed: typing.List[typing.Any]
+
+ def __init__(self, graph: rdflib.Graph):
+ self._graph = graph
+ # initialize internal structures
+ self.commit()
+
+ def commit(self):
+ """Commit temporary changes."""
+ self._added = []
+ self._removed = []
+
+ def rollback(self):
+ """Undo changes since the last commit."""
+ for triple in self._added:
+ self._graph.remove(triple)
+ for triple in self._removed:
+ self._graph.add(triple)
+
+ def add(self, triple: typing.Any):
+ """Add a triple to the graph."""
+ if triple not in self._graph:
+ self._added.append(triple)
+ self._graph.add(triple)
+
+ def remove(self, triple: typing.Any):
+ """Remove a triple from the graph."""
+ if triple in self._graph:
+ self._removed.append(triple)
+ self._graph.remove(triple)
+
+
+class SparqlStore(base.TripleStoreBase):
+ """Sparql-based triple store.
+
+ The sparql triple store uses a third-party backend
+ (currently rdflib) to store triples and manages them via
+ the Sparql query language.
+
+ """
+
+ # The rdflib graph.
+ _graph: rdflib.Graph
+
+ # Current transaction.
+ _transaction: _Transaction
+
+ # The local schema.
+ _schema: bsc.Schema
+
+ # Filter parser
+ _filter_parser: parse_filter.Filter
+
+ def __init__(self):
+ super().__init__(None)
+ self._graph = rdflib.Graph()
+ self._transaction = _Transaction(self._graph)
+ self._schema = bsc.Schema.Empty()
+ self._filter_parser = parse_filter.Filter(self._schema)
+
+ # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super)
+ # However, not having it here is clearer since it's explicit that there are no arguments.
+ @classmethod
+ def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ
+ return cls()
+
+ def commit(self):
+ self._transaction.commit()
+
+ def rollback(self):
+ self._transaction.rollback()
+
+ @property
+ def schema(self) -> bsc.Schema:
+ return self._schema
+
+ @schema.setter
+ def schema(self, schema: bsc.Schema):
+ # check args: Schema instanace
+ if not isinstance(schema, bsc.Schema):
+ raise TypeError(schema)
+ # check compatibility: No contradicting definitions
+ if not self.schema.consistent_with(schema):
+ raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}')
+
+ # commit the current transaction
+ self.commit()
+
+ # adjust instances:
+ # nothing to do for added classes
+ # delete instances of removed classes
+
+ # get deleted classes
+ sub = self.schema - schema
+
+ for pred in sub.predicates:
+ # remove predicate instances
+ for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)):
+ self._transaction.remove((src, rdflib.URIRef(pred.uri), trg))
+ # remove predicate definition
+ if pred.parent is not None:
+ self._transaction.remove((
+ rdflib.URIRef(pred.uri),
+ rdflib.RDFS.subClassOf,
+ rdflib.URIRef(pred.parent.uri),
+ ))
+
+ # remove node instances
+ for node in sub.nodes:
+ # iterate through node instances
+ for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)):
+ # remove triples where the instance is in the object position
+ for src, pred in self._graph.subject_predicates(inst):
+ self._transaction.remove((src, pred, inst))
+ # remove triples where the instance is in the subject position
+ for pred, trg in self._graph.predicate_objects(inst):
+ self._transaction.remove((inst, pred, trg))
+ # remove instance
+ self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri)))
+ # remove node definition
+ if node.parent is not None:
+ self._transaction.remove((
+ rdflib.URIRef(node.uri),
+ rdflib.RDFS.subClassOf,
+ rdflib.URIRef(node.parent.uri),
+ ))
+
+ for lit in sub.literals:
+ # remove literal definition
+ if lit.parent is not None:
+ self._transaction.remove((
+ rdflib.URIRef(lit.uri),
+ rdflib.RDFS.subClassOf,
+ rdflib.URIRef(lit.parent.uri),
+ ))
+
+ # add predicate, node, and literal hierarchies to the graph
+ for itm in itertools.chain(schema.predicates(), schema.nodes(), schema.literals()):
+ if itm.parent is not None:
+ self._transaction.add((rdflib.URIRef(itm.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(itm.parent.uri)))
+
+ # commit instance changes
+ self.commit()
+
+ # migrate schema
+ self._schema = schema
+ self._filter_parser.schema = schema
+
+ def get(
+ self,
+ node_type: bsc.Node,
+ query: typing.Optional[ast.filter.FilterExpression] = None,
+ ) -> typing.Iterator[URI]:
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ if not isinstance(query, ast.filter.FilterExpression):
+ raise TypeError(query)
+ for guid, in self._graph.query(self._filter_parser(node_type, query)):
+ yield URI(guid)
+
+ def _has_type(self, subject: URI, node_type: bsc.Node) -> bool:
+ """Return True if *subject* is a node of class *node_type* or a subclass thereof."""
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+
+ subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type))
+ if len(subject_types) == 0:
+ return False
+ if len(subject_types) == 1:
+ node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str
+ if node == node_type:
+ return True
+ if node_type in node.parents():
+ return True
+ return False
+ raise errors.UnreachableError()
+
+ def exists(
+ self,
+ node_type: bsc.Node,
+ guids: typing.Iterable[URI],
+ ) -> typing.Iterable[URI]:
+ return (subj for subj in guids if self._has_type(subj, node_type))
+
+ def create(
+ self,
+ node_type: bsc.Node,
+ guids: typing.Iterable[URI],
+ ):
+ # check node_type
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ # check and create guids
+ for guid in guids:
+ subject = rdflib.URIRef(guid)
+ # check node existence
+ if (subject, rdflib.RDF.type, None) in self._graph:
+ # FIXME: node exists and may have a different type! ignore? raise? report?
+ continue
+ # add node
+ self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri)))
+
+ def set(
+ self,
+ node_type: bsc.Node,
+ guids: typing.Iterable[URI],
+ predicate: bsc.Predicate,
+ values: typing.Iterable[typing.Any],
+ ):
+ # check node_type
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ # check predicate
+ if predicate not in self.schema.predicates():
+ raise errors.ConsistencyError(f'{predicate} is not defined in the schema')
+ if not node_type <= predicate.domain:
+ raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}')
+ # NOTE: predicate.range is in the schema since predicate is in the schema.
+ # materialize values
+ values = set(values)
+ # check values
+ if len(values) == 0:
+ return
+ if predicate.unique and len(values) != 1:
+ raise ValueError(values)
+ if isinstance(predicate.range, bsc.Node):
+ values = set(values) # materialize to safeguard against iterators passed as argument
+ inconsistent = {val for val in values if not self._has_type(val, predicate.range)}
+ # catches nodes that don't exist and nodes that have an inconsistent type
+ if len(inconsistent) > 0:
+ raise errors.InstanceError(inconsistent)
+ # check guids
+ # FIXME: Fail or skip inexistent nodes?
+ guids = set(guids)
+ inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)}
+ if len(inconsistent) > 0:
+ raise errors.InstanceError(inconsistent)
+
+ # add triples
+ pred = rdflib.URIRef(predicate.uri)
+ for guid, value in itertools.product(guids, values):
+ guid = rdflib.URIRef(guid)
+ # convert value
+ if isinstance(predicate.range, bsc.Literal):
+ value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri))
+ elif isinstance(predicate.range, bsc.Node):
+ value = rdflib.URIRef(value)
+ else:
+ raise errors.UnreachableError()
+ # clear triples for unique predicates
+ if predicate.unique:
+ for obj in self._graph.objects(guid, pred):
+ if obj != value:
+ self._transaction.remove((guid, pred, obj))
+ # add triple
+ self._transaction.add((guid, pred, value))
+
+## EOF ##