From e8492489098ef5f8566214e083cd2c2d1d449f5a Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Thu, 8 Dec 2022 16:36:19 +0100 Subject: sparql triple store and graph (nodes, mostly) --- bsfs/triple_store/sparql.py | 253 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 bsfs/triple_store/sparql.py (limited to 'bsfs/triple_store/sparql.py') diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py new file mode 100644 index 0000000..3eab869 --- /dev/null +++ b/bsfs/triple_store/sparql.py @@ -0,0 +1,253 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import itertools +import typing +import rdflib + +# bsfs imports +from bsfs.utils import URI +from bsfs.utils import errors +import bsfs.schema as _schema + +# inner-module imports +from . import base + + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + ) + + +## code ## + +class Transaction(): + """Lightweight rdflib transactions for in-memory databases.""" + + def __init__(self, graph): + self._graph = graph + self.commit() # initialize + + def commit(self): + self._added = [] + self._removed = [] + + def rollback(self): + for triple in self._added: + self._graph.remove(triple) + for triple in self._removed: + self._graph.add(triple) + + def add(self, triple): + if triple not in self._graph: + self._added.append(triple) + self._graph.add(triple) + + def remove(self, triple): + if triple in self._graph: + self._removed.append(triple) + self._graph.remove(triple) + + +class SparqlStore(base.TripleStoreBase): + """ + """ + + def __init__(self, uri: typing.Optional[URI] = None): + super().__init__(uri) + self.graph = rdflib.Graph() + self.transaction = Transaction(self.graph) + self.__schema = _schema.Schema.Empty() + + @classmethod + def Open( + cls, + uri: str, + **kwargs: typing.Any, + ) -> 'SparqlStore': + return cls(None) + + def commit(self): + self.transaction.commit() + + def rollback(self): + self.transaction.rollback() + + @property + def schema(self) -> _schema.Schema: + """Return the current schema.""" + return self.__schema + + @schema.setter + def schema(self, schema: _schema.Schema): + """Migrate to new schema by adding or removing class definitions. + + Commits before and after the migration. + + Instances of removed classes will be deleted irreversably. + Note that modifying an existing class is not directly supported. + Also, it is generally discouraged, since changing definitions may + lead to inconsistencies across multiple clients in a distributed + setting. Instead, consider introducing a new class under its own + uri. Such a migration would look as follows: + + 1. Add new class definitions. + 2. Create instances of the new classes and copy relevant data. + 3. Remove the old definitions. + + To modify a class, i.e., re-use a previous uri with a new + class definition, you would have to migrate via temporary + class definitions, and thus repeat the above procedure two times. + + """ + # check args: Schema instanace + if not isinstance(schema, _schema.Schema): + raise TypeError(schema) + # check compatibility: No contradicting definitions + if not self.schema.consistent_with(schema): + raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') + + # commit the current transaction + self.commit() + + # adjust instances: + # nothing to do for added classes + # delete instances of removed classes + + # get deleted classes + sub = self.schema - schema + + # remove predicate instances + for pred in sub.predicates: + for src, trg in self.graph.subject_objects(rdflib.URIRef(pred.uri)): + self.transaction.remove((src, rdflib.URIRef(pred.uri), trg)) + + # remove node instances + for node in sub.nodes: + # iterate through node instances + for inst in self.graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): + # remove triples where the instance is in the object position + for src, pred in self.graph.subject_predicates(inst): + self.transaction.remove((src, pred, inst)) + # remove triples where the instance is in the subject position + for pred, trg in self.graph.predicate_objects(inst): + self.transaction.remove((inst, pred, trg)) + # remove instance + self.transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) + + # NOTE: Nothing to do for literals + + # commit instance changes + self.commit() + + # migrate schema + self.__schema = schema + + + def _has_type(self, subject: URI, node_type: _schema.Node) -> bool: + """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + + subject_types = list(self.graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) + if len(subject_types) == 0: + return False + elif len(subject_types) == 1: + node = self.schema.node(URI(subject_types[0])) + if node == node_type: + return True + elif node_type in node.parents(): + return True + else: + return False + else: + raise errors.UnreachableError() + + def exists( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """ + """ + return {subj for subj in guids if self._has_type(subj, node_type)} + + def create( + self, + node_type: _schema.Node, + guids: typing.Iterable[URI], + ): + """ + """ + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check and create guids + for guid in guids: + guid = rdflib.URIRef(guid) + # check node existence + if (guid, rdflib.RDF.type, None) in self.graph: + # FIXME: node exists and may have a different type! ignore? raise? report? + continue + # add node + self.transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) + + def set( + self, + node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + guids: typing.Iterable[URI], + predicate: _schema.Predicate, + values: typing.Iterable[typing.Any], + ): + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check predicate + if predicate not in self.schema.predicates(): + raise errors.ConsistencyError(f'{predicate} is not defined in the schema') + if not node_type <= predicate.domain: + raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') + # NOTE: predicate.range is in the schema since predicate is in the schema. + # check values + if len(values) == 0: + return + if predicate.unique and len(values) != 1: + raise ValueError(values) + if isinstance(predicate.range, _schema.Node): + values = set(values) # materialize to safeguard against iterators passed as argument + inconsistent = {val for val in values if not self._has_type(val, predicate.range)} + # catches nodes that don't exist and nodes that have an inconsistent type + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + # check guids + # FIXME: Fail or skip inexistent nodes? + guids = set(guids) + inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + + # add triples + pred = rdflib.URIRef(predicate.uri) + for guid, value in itertools.product(guids, values): + guid = rdflib.URIRef(guid) + # convert value + if isinstance(predicate.range, _schema.Literal): + value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) + elif isinstance(predicate.range, _schema.Node): + value = rdflib.URIRef(value) + else: + raise errors.UnreachableError() + # clear triples for unique predicates + if predicate.unique: + for obj in self.graph.objects(guid, pred): + if obj != value: + self.transaction.remove((guid, pred, obj)) + # add triple + self.transaction.add((guid, pred, value)) + +## EOF ## -- cgit v1.2.3 From edd5390b6db1550f6a80a46f0eaf5f3916997532 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sun, 18 Dec 2022 14:06:58 +0100 Subject: information hiding --- bsfs/triple_store/sparql.py | 81 ++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 37 deletions(-) (limited to 'bsfs/triple_store/sparql.py') diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py index 3eab869..d9ed55a 100644 --- a/bsfs/triple_store/sparql.py +++ b/bsfs/triple_store/sparql.py @@ -10,9 +10,8 @@ import typing import rdflib # bsfs imports -from bsfs.utils import URI -from bsfs.utils import errors -import bsfs.schema as _schema +from bsfs import schema as bsc +from bsfs.utils import errors, URI # inner-module imports from . import base @@ -26,7 +25,7 @@ __all__: typing.Sequence[str] = ( ## code ## -class Transaction(): +class _Transaction(): """Lightweight rdflib transactions for in-memory databases.""" def __init__(self, graph): @@ -58,11 +57,20 @@ class SparqlStore(base.TripleStoreBase): """ """ - def __init__(self, uri: typing.Optional[URI] = None): - super().__init__(uri) - self.graph = rdflib.Graph() - self.transaction = Transaction(self.graph) - self.__schema = _schema.Schema.Empty() + # The rdflib graph. + _graph: rdflib.Graph + + # Current transaction. + _transaction: _Transaction + + # The local schema. + _schema: bsc.Schema + + def __init__(self): + super().__init__(None) + self._graph = rdflib.Graph() + self._transaction = _Transaction(self._graph) + self._schema = bsc.Schema.Empty() @classmethod def Open( @@ -73,15 +81,14 @@ class SparqlStore(base.TripleStoreBase): return cls(None) def commit(self): - self.transaction.commit() + self._transaction.commit() def rollback(self): - self.transaction.rollback() + self._transaction.rollback() @property - def schema(self) -> _schema.Schema: - """Return the current schema.""" - return self.__schema + def schema(self) -> bsc.Schema: + return self._schema @schema.setter def schema(self, schema: _schema.Schema): @@ -106,7 +113,7 @@ class SparqlStore(base.TripleStoreBase): """ # check args: Schema instanace - if not isinstance(schema, _schema.Schema): + if not isinstance(schema, bsc.Schema): raise TypeError(schema) # check compatibility: No contradicting definitions if not self.schema.consistent_with(schema): @@ -124,21 +131,21 @@ class SparqlStore(base.TripleStoreBase): # remove predicate instances for pred in sub.predicates: - for src, trg in self.graph.subject_objects(rdflib.URIRef(pred.uri)): - self.transaction.remove((src, rdflib.URIRef(pred.uri), trg)) + for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): + self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) # remove node instances for node in sub.nodes: # iterate through node instances - for inst in self.graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): + for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): # remove triples where the instance is in the object position - for src, pred in self.graph.subject_predicates(inst): - self.transaction.remove((src, pred, inst)) + for src, pred in self._graph.subject_predicates(inst): + self._transaction.remove((src, pred, inst)) # remove triples where the instance is in the subject position - for pred, trg in self.graph.predicate_objects(inst): - self.transaction.remove((inst, pred, trg)) + for pred, trg in self._graph.predicate_objects(inst): + self._transaction.remove((inst, pred, trg)) # remove instance - self.transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) + self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) # NOTE: Nothing to do for literals @@ -146,15 +153,15 @@ class SparqlStore(base.TripleStoreBase): self.commit() # migrate schema - self.__schema = schema + self._schema = schema - def _has_type(self, subject: URI, node_type: _schema.Node) -> bool: + def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') - subject_types = list(self.graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) + subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) if len(subject_types) == 0: return False elif len(subject_types) == 1: @@ -170,7 +177,7 @@ class SparqlStore(base.TripleStoreBase): def exists( self, - node_type: _schema.Node, + node_type: bsc.Node, guids: typing.Iterable[URI], ): """ @@ -179,7 +186,7 @@ class SparqlStore(base.TripleStoreBase): def create( self, - node_type: _schema.Node, + node_type: bsc.Node, guids: typing.Iterable[URI], ): """ @@ -195,13 +202,13 @@ class SparqlStore(base.TripleStoreBase): # FIXME: node exists and may have a different type! ignore? raise? report? continue # add node - self.transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) + self._transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) def set( self, - node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + node_type: bsc.Node, guids: typing.Iterable[URI], - predicate: _schema.Predicate, + predicate: bsc.Predicate, values: typing.Iterable[typing.Any], ): # check node_type @@ -218,7 +225,7 @@ class SparqlStore(base.TripleStoreBase): return if predicate.unique and len(values) != 1: raise ValueError(values) - if isinstance(predicate.range, _schema.Node): + if isinstance(predicate.range, bsc.Node): values = set(values) # materialize to safeguard against iterators passed as argument inconsistent = {val for val in values if not self._has_type(val, predicate.range)} # catches nodes that don't exist and nodes that have an inconsistent type @@ -236,18 +243,18 @@ class SparqlStore(base.TripleStoreBase): for guid, value in itertools.product(guids, values): guid = rdflib.URIRef(guid) # convert value - if isinstance(predicate.range, _schema.Literal): + if isinstance(predicate.range, bsc.Literal): value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) - elif isinstance(predicate.range, _schema.Node): + elif isinstance(predicate.range, bsc.Node): value = rdflib.URIRef(value) else: raise errors.UnreachableError() # clear triples for unique predicates if predicate.unique: - for obj in self.graph.objects(guid, pred): + for obj in self._graph.objects(guid, pred): if obj != value: - self.transaction.remove((guid, pred, obj)) + self._transaction.remove((guid, pred, obj)) # add triple - self.transaction.add((guid, pred, value)) + self._transaction.add((guid, pred, value)) ## EOF ## -- cgit v1.2.3 From 58496960926a56149c10d64e01b6df7d048eed0e Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sun, 18 Dec 2022 14:11:27 +0100 Subject: triple store Open interface --- bsfs/triple_store/sparql.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'bsfs/triple_store/sparql.py') diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py index d9ed55a..fc161b3 100644 --- a/bsfs/triple_store/sparql.py +++ b/bsfs/triple_store/sparql.py @@ -72,13 +72,11 @@ class SparqlStore(base.TripleStoreBase): self._transaction = _Transaction(self._graph) self._schema = bsc.Schema.Empty() + # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) + # However, not having it here is clearer since it's explicit that there are no arguments. @classmethod - def Open( - cls, - uri: str, - **kwargs: typing.Any, - ) -> 'SparqlStore': - return cls(None) + def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ + return cls() def commit(self): self._transaction.commit() -- cgit v1.2.3 From e19c8f9d0818a147832df0945188ea14de9c7690 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sun, 18 Dec 2022 14:15:18 +0100 Subject: documentation, types, and style fixes --- bsfs/triple_store/sparql.py | 73 ++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 38 deletions(-) (limited to 'bsfs/triple_store/sparql.py') diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py index fc161b3..23059f7 100644 --- a/bsfs/triple_store/sparql.py +++ b/bsfs/triple_store/sparql.py @@ -28,33 +28,52 @@ __all__: typing.Sequence[str] = ( class _Transaction(): """Lightweight rdflib transactions for in-memory databases.""" - def __init__(self, graph): + # graph instance. + _graph: rdflib.Graph + + # current log of added triples. + _added: typing.List[typing.Any] + + # current log of removed triples. + _removed: typing.List[typing.Any] + + def __init__(self, graph: rdflib.Graph): self._graph = graph - self.commit() # initialize + # initialize internal structures + self.commit() def commit(self): + """Commit temporary changes.""" self._added = [] self._removed = [] def rollback(self): + """Undo changes since the last commit.""" for triple in self._added: self._graph.remove(triple) for triple in self._removed: self._graph.add(triple) - def add(self, triple): + def add(self, triple: typing.Any): + """Add a triple to the graph.""" if triple not in self._graph: self._added.append(triple) self._graph.add(triple) - def remove(self, triple): + def remove(self, triple: typing.Any): + """Remove a triple from the graph.""" if triple in self._graph: self._removed.append(triple) self._graph.remove(triple) class SparqlStore(base.TripleStoreBase): - """ + """Sparql-based triple store. + + The sparql triple store uses a third-party backend + (currently rdflib) to store triples and manages them via + the Sparql query language. + """ # The rdflib graph. @@ -89,27 +108,7 @@ class SparqlStore(base.TripleStoreBase): return self._schema @schema.setter - def schema(self, schema: _schema.Schema): - """Migrate to new schema by adding or removing class definitions. - - Commits before and after the migration. - - Instances of removed classes will be deleted irreversably. - Note that modifying an existing class is not directly supported. - Also, it is generally discouraged, since changing definitions may - lead to inconsistencies across multiple clients in a distributed - setting. Instead, consider introducing a new class under its own - uri. Such a migration would look as follows: - - 1. Add new class definitions. - 2. Create instances of the new classes and copy relevant data. - 3. Remove the old definitions. - - To modify a class, i.e., re-use a previous uri with a new - class definition, you would have to migrate via temporary - class definitions, and thus repeat the above procedure two times. - - """ + def schema(self, schema: bsc.Schema): # check args: Schema instanace if not isinstance(schema, bsc.Schema): raise TypeError(schema) @@ -162,16 +161,14 @@ class SparqlStore(base.TripleStoreBase): subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) if len(subject_types) == 0: return False - elif len(subject_types) == 1: - node = self.schema.node(URI(subject_types[0])) + if len(subject_types) == 1: + node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str if node == node_type: return True - elif node_type in node.parents(): + if node_type in node.parents(): return True - else: - return False - else: - raise errors.UnreachableError() + return False + raise errors.UnreachableError() def exists( self, @@ -187,20 +184,18 @@ class SparqlStore(base.TripleStoreBase): node_type: bsc.Node, guids: typing.Iterable[URI], ): - """ - """ # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check and create guids for guid in guids: - guid = rdflib.URIRef(guid) + subject = rdflib.URIRef(guid) # check node existence - if (guid, rdflib.RDF.type, None) in self.graph: + if (subject, rdflib.RDF.type, None) in self._graph: # FIXME: node exists and may have a different type! ignore? raise? report? continue # add node - self._transaction.add((guid, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) + self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) def set( self, @@ -218,6 +213,8 @@ class SparqlStore(base.TripleStoreBase): if not node_type <= predicate.domain: raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') # NOTE: predicate.range is in the schema since predicate is in the schema. + # materialize values + values = set(values) # check values if len(values) == 0: return -- cgit v1.2.3 From a5ce14c8bbd55f4a078ceea9384cda56bf42a18b Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sun, 18 Dec 2022 14:16:06 +0100 Subject: SparqlStore.exists bugfix --- bsfs/triple_store/sparql.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'bsfs/triple_store/sparql.py') diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py index 23059f7..7516dff 100644 --- a/bsfs/triple_store/sparql.py +++ b/bsfs/triple_store/sparql.py @@ -174,10 +174,8 @@ class SparqlStore(base.TripleStoreBase): self, node_type: bsc.Node, guids: typing.Iterable[URI], - ): - """ - """ - return {subj for subj in guids if self._has_type(subj, node_type)} + ) -> typing.Iterable[URI]: + return (subj for subj in guids if self._has_type(subj, node_type)) def create( self, -- cgit v1.2.3