aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/triple_store/sparql.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/triple_store/sparql.py')
-rw-r--r--bsfs/triple_store/sparql.py73
1 files changed, 35 insertions, 38 deletions
diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py
index fc161b3..23059f7 100644
--- a/bsfs/triple_store/sparql.py
+++ b/bsfs/triple_store/sparql.py
@@ -28,33 +28,52 @@ __all__: typing.Sequence[str] = (
class _Transaction():
"""Lightweight rdflib transactions for in-memory databases."""
- def __init__(self, graph):
+ # graph instance.
+ _graph: rdflib.Graph
+
+ # current log of added triples.
+ _added: typing.List[typing.Any]
+
+ # current log of removed triples.
+ _removed: typing.List[typing.Any]
+
+ def __init__(self, graph: rdflib.Graph):
self._graph = graph
- self.commit() # initialize
+ # initialize internal structures
+ self.commit()
def commit(self):
+ """Commit temporary changes."""
self._added = []
self._removed = []
def rollback(self):
+ """Undo changes since the last commit."""
for triple in self._added:
self._graph.remove(triple)
for triple in self._removed:
self._graph.add(triple)
- def add(self, triple):
+ def add(self, triple: typing.Any):
+ """Add a triple to the graph."""
if triple not in self._graph:
self._added.append(triple)
self._graph.add(triple)
- def remove(self, triple):
+ def remove(self, triple: typing.Any):
+ """Remove a triple from the graph."""
if triple in self._graph:
self._removed.append(triple)
self._graph.remove(triple)
class SparqlStore(base.TripleStoreBase):
- """
+ """Sparql-based triple store.
+
+ The sparql triple store uses a third-party backend
+ (currently rdflib) to store triples and manages them via
+ the Sparql query language.
+
"""
# The rdflib graph.
@@ -89,27 +108,7 @@ class SparqlStore(base.TripleStoreBase):
return self._schema
@schema.setter
- def schema(self, schema: _schema.Schema):
- """Migrate to new schema by adding or removing class definitions.
-
- Commits before and after the migration.
-
- Instances of removed classes will be deleted irreversably.
- Note that modifying an existing class is not directly supported.
- Also, it is generally discouraged, since changing definitions may
- lead to inconsistencies across multiple clients in a distributed
- setting. Instead, consider introducing a new class under its own
- uri. Such a migration would look as follows:
-
- 1. Add new class definitions.
- 2. Create instances of the new classes and copy relevant data.
- 3. Remove the old definitions.
-
- To modify a class, i.e., re-use a previous uri with a new
- class definition, you would have to migrate via temporary
- class definitions, and thus repeat the above procedure two times.
-
- """
+ def schema(self, schema: bsc.Schema):
# check args: Schema instanace
if not isinstance(schema, bsc.Schema):
raise TypeError(schema)
@@ -162,16 +161,14 @@ class SparqlStore(base.TripleStoreBase):
subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type))
if len(subject_types) == 0:
return False
- elif len(subject_types) == 1:
- node = self.schema.node(URI(subject_types[0]))
+ if len(subject_types) == 1:
+ node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str
if node == node_type:
return True
- elif node_type in node.parents():
+ if node_type in node.parents():
return True
- else:
- return False
- else:
- raise errors.UnreachableError()
+ return False
+ raise errors.UnreachableError()
def exists(
self,
@@ -187,20 +184,18 @@ class SparqlStore(base.TripleStoreBase):
node_type: bsc.Node,
guids: typing.Iterable[URI],
):
- """
- """
# check node_type
if node_type not in self.schema.nodes():
raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
# check and create guids
for guid in guids:
- guid = rdflib.URIRef(guid)
+ subject = rdflib.URIRef(guid)
# check node existence
- if (guid, rdflib.RDF.type, None) in self.graph:
+ if (subject, rdflib.RDF.type, None) in self._graph:
# FIXME: node exists and may have a different type! ignore? raise? report?
continue
# add node
- self._transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri)))
+ self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri)))
def set(
self,
@@ -218,6 +213,8 @@ class SparqlStore(base.TripleStoreBase):
if not node_type <= predicate.domain:
raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}')
# NOTE: predicate.range is in the schema since predicate is in the schema.
+ # materialize values
+ values = set(values)
# check values
if len(values) == 0:
return