From 791918039979d0743fd2ea4b9a5e74593ff96fd0 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Mon, 19 Dec 2022 13:32:34 +0100 Subject: query ast file structures and essential interfaces --- bsfs/triple_store/base.py | 8 ++ bsfs/triple_store/sparql.py | 253 ---------------------------------- bsfs/triple_store/sparql/__init__.py | 18 +++ bsfs/triple_store/sparql/sparql.py | 256 +++++++++++++++++++++++++++++++++++ 4 files changed, 282 insertions(+), 253 deletions(-) delete mode 100644 bsfs/triple_store/sparql.py create mode 100644 bsfs/triple_store/sparql/__init__.py create mode 100644 bsfs/triple_store/sparql/sparql.py (limited to 'bsfs/triple_store') diff --git a/bsfs/triple_store/base.py b/bsfs/triple_store/base.py index 6561262..28ebb86 100644 --- a/bsfs/triple_store/base.py +++ b/bsfs/triple_store/base.py @@ -108,6 +108,14 @@ class TripleStoreBase(abc.ABC): """ + @abc.abstractmethod + def get( + self, + node_type: bsc.Node, + query: ast.filter.FilterExpression, + ) -> typing.Iterator[URI]: + """Return guids of nodes of type *node_type* that match the *query*.""" + @abc.abstractmethod def exists( self, diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql.py deleted file mode 100644 index 7516dff..0000000 --- a/bsfs/triple_store/sparql.py +++ /dev/null @@ -1,253 +0,0 @@ -""" - -Part of the BlackStar filesystem (bsfs) module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports -import itertools -import typing -import rdflib - -# bsfs imports -from bsfs import schema as bsc -from bsfs.utils import errors, URI - -# inner-module imports -from . import base - - -# exports -__all__: typing.Sequence[str] = ( - 'SparqlStore', - ) - - -## code ## - -class _Transaction(): - """Lightweight rdflib transactions for in-memory databases.""" - - # graph instance. - _graph: rdflib.Graph - - # current log of added triples. - _added: typing.List[typing.Any] - - # current log of removed triples. - _removed: typing.List[typing.Any] - - def __init__(self, graph: rdflib.Graph): - self._graph = graph - # initialize internal structures - self.commit() - - def commit(self): - """Commit temporary changes.""" - self._added = [] - self._removed = [] - - def rollback(self): - """Undo changes since the last commit.""" - for triple in self._added: - self._graph.remove(triple) - for triple in self._removed: - self._graph.add(triple) - - def add(self, triple: typing.Any): - """Add a triple to the graph.""" - if triple not in self._graph: - self._added.append(triple) - self._graph.add(triple) - - def remove(self, triple: typing.Any): - """Remove a triple from the graph.""" - if triple in self._graph: - self._removed.append(triple) - self._graph.remove(triple) - - -class SparqlStore(base.TripleStoreBase): - """Sparql-based triple store. - - The sparql triple store uses a third-party backend - (currently rdflib) to store triples and manages them via - the Sparql query language. - - """ - - # The rdflib graph. - _graph: rdflib.Graph - - # Current transaction. - _transaction: _Transaction - - # The local schema. - _schema: bsc.Schema - - def __init__(self): - super().__init__(None) - self._graph = rdflib.Graph() - self._transaction = _Transaction(self._graph) - self._schema = bsc.Schema.Empty() - - # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) - # However, not having it here is clearer since it's explicit that there are no arguments. - @classmethod - def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ - return cls() - - def commit(self): - self._transaction.commit() - - def rollback(self): - self._transaction.rollback() - - @property - def schema(self) -> bsc.Schema: - return self._schema - - @schema.setter - def schema(self, schema: bsc.Schema): - # check args: Schema instanace - if not isinstance(schema, bsc.Schema): - raise TypeError(schema) - # check compatibility: No contradicting definitions - if not self.schema.consistent_with(schema): - raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') - - # commit the current transaction - self.commit() - - # adjust instances: - # nothing to do for added classes - # delete instances of removed classes - - # get deleted classes - sub = self.schema - schema - - # remove predicate instances - for pred in sub.predicates: - for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): - self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) - - # remove node instances - for node in sub.nodes: - # iterate through node instances - for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): - # remove triples where the instance is in the object position - for src, pred in self._graph.subject_predicates(inst): - self._transaction.remove((src, pred, inst)) - # remove triples where the instance is in the subject position - for pred, trg in self._graph.predicate_objects(inst): - self._transaction.remove((inst, pred, trg)) - # remove instance - self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) - - # NOTE: Nothing to do for literals - - # commit instance changes - self.commit() - - # migrate schema - self._schema = schema - - - def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: - """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" - if node_type not in self.schema.nodes(): - raise errors.ConsistencyError(f'{node_type} is not defined in the schema') - - subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) - if len(subject_types) == 0: - return False - if len(subject_types) == 1: - node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str - if node == node_type: - return True - if node_type in node.parents(): - return True - return False - raise errors.UnreachableError() - - def exists( - self, - node_type: bsc.Node, - guids: typing.Iterable[URI], - ) -> typing.Iterable[URI]: - return (subj for subj in guids if self._has_type(subj, node_type)) - - def create( - self, - node_type: bsc.Node, - guids: typing.Iterable[URI], - ): - # check node_type - if node_type not in self.schema.nodes(): - raise errors.ConsistencyError(f'{node_type} is not defined in the schema') - # check and create guids - for guid in guids: - subject = rdflib.URIRef(guid) - # check node existence - if (subject, rdflib.RDF.type, None) in self._graph: - # FIXME: node exists and may have a different type! ignore? raise? report? - continue - # add node - self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) - - def set( - self, - node_type: bsc.Node, - guids: typing.Iterable[URI], - predicate: bsc.Predicate, - values: typing.Iterable[typing.Any], - ): - # check node_type - if node_type not in self.schema.nodes(): - raise errors.ConsistencyError(f'{node_type} is not defined in the schema') - # check predicate - if predicate not in self.schema.predicates(): - raise errors.ConsistencyError(f'{predicate} is not defined in the schema') - if not node_type <= predicate.domain: - raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') - # NOTE: predicate.range is in the schema since predicate is in the schema. - # materialize values - values = set(values) - # check values - if len(values) == 0: - return - if predicate.unique and len(values) != 1: - raise ValueError(values) - if isinstance(predicate.range, bsc.Node): - values = set(values) # materialize to safeguard against iterators passed as argument - inconsistent = {val for val in values if not self._has_type(val, predicate.range)} - # catches nodes that don't exist and nodes that have an inconsistent type - if len(inconsistent) > 0: - raise errors.InstanceError(inconsistent) - # check guids - # FIXME: Fail or skip inexistent nodes? - guids = set(guids) - inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} - if len(inconsistent) > 0: - raise errors.InstanceError(inconsistent) - - # add triples - pred = rdflib.URIRef(predicate.uri) - for guid, value in itertools.product(guids, values): - guid = rdflib.URIRef(guid) - # convert value - if isinstance(predicate.range, bsc.Literal): - value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) - elif isinstance(predicate.range, bsc.Node): - value = rdflib.URIRef(value) - else: - raise errors.UnreachableError() - # clear triples for unique predicates - if predicate.unique: - for obj in self._graph.objects(guid, pred): - if obj != value: - self._transaction.remove((guid, pred, obj)) - # add triple - self._transaction.add((guid, pred, value)) - -## EOF ## diff --git a/bsfs/triple_store/sparql/__init__.py b/bsfs/triple_store/sparql/__init__.py new file mode 100644 index 0000000..285334a --- /dev/null +++ b/bsfs/triple_store/sparql/__init__.py @@ -0,0 +1,18 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# inner-module imports +from .sparql import SparqlStore + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + ) + +## EOF ## diff --git a/bsfs/triple_store/sparql/sparql.py b/bsfs/triple_store/sparql/sparql.py new file mode 100644 index 0000000..fff540a --- /dev/null +++ b/bsfs/triple_store/sparql/sparql.py @@ -0,0 +1,256 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import itertools +import typing +import rdflib + +# bsfs imports +from bsfs import schema as bsc +from bsfs.query import ast +from bsfs.utils import errors, URI + +# inner-module imports +from . import base + + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + ) + + +## code ## + +class _Transaction(): + """Lightweight rdflib transactions for in-memory databases.""" + + # graph instance. + _graph: rdflib.Graph + + # current log of added triples. + _added: typing.List[typing.Any] + + # current log of removed triples. + _removed: typing.List[typing.Any] + + def __init__(self, graph: rdflib.Graph): + self._graph = graph + # initialize internal structures + self.commit() + + def commit(self): + """Commit temporary changes.""" + self._added = [] + self._removed = [] + + def rollback(self): + """Undo changes since the last commit.""" + for triple in self._added: + self._graph.remove(triple) + for triple in self._removed: + self._graph.add(triple) + + def add(self, triple: typing.Any): + """Add a triple to the graph.""" + if triple not in self._graph: + self._added.append(triple) + self._graph.add(triple) + + def remove(self, triple: typing.Any): + """Remove a triple from the graph.""" + if triple in self._graph: + self._removed.append(triple) + self._graph.remove(triple) + + +class SparqlStore(base.TripleStoreBase): + """Sparql-based triple store. + + The sparql triple store uses a third-party backend + (currently rdflib) to store triples and manages them via + the Sparql query language. + + """ + + # The rdflib graph. + _graph: rdflib.Graph + + # Current transaction. + _transaction: _Transaction + + # The local schema. + _schema: bsc.Schema + + def __init__(self): + super().__init__(None) + self._graph = rdflib.Graph() + self._transaction = _Transaction(self._graph) + self._schema = bsc.Schema.Empty() + + # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) + # However, not having it here is clearer since it's explicit that there are no arguments. + @classmethod + def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ + return cls() + + def commit(self): + self._transaction.commit() + + def rollback(self): + self._transaction.rollback() + + @property + def schema(self) -> bsc.Schema: + return self._schema + + @schema.setter + def schema(self, schema: bsc.Schema): + # check args: Schema instanace + if not isinstance(schema, bsc.Schema): + raise TypeError(schema) + # check compatibility: No contradicting definitions + if not self.schema.consistent_with(schema): + raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') + + # commit the current transaction + self.commit() + + # adjust instances: + # nothing to do for added classes + # delete instances of removed classes + + # get deleted classes + sub = self.schema - schema + + # remove predicate instances + for pred in sub.predicates: + for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): + self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) + + # remove node instances + for node in sub.nodes: + # iterate through node instances + for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): + # remove triples where the instance is in the object position + for src, pred in self._graph.subject_predicates(inst): + self._transaction.remove((src, pred, inst)) + # remove triples where the instance is in the subject position + for pred, trg in self._graph.predicate_objects(inst): + self._transaction.remove((inst, pred, trg)) + # remove instance + self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) + + # NOTE: Nothing to do for literals + + # commit instance changes + self.commit() + + # migrate schema + self._schema = schema + + def get(self, node_type: bsc.Node, query: ast.filter.FilterExpression) -> typing.Iterator[URI]: + raise NotImplementedError() + + def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: + """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + + subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) + if len(subject_types) == 0: + return False + if len(subject_types) == 1: + node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str + if node == node_type: + return True + if node_type in node.parents(): + return True + return False + raise errors.UnreachableError() + + def exists( + self, + node_type: bsc.Node, + guids: typing.Iterable[URI], + ) -> typing.Iterable[URI]: + return (subj for subj in guids if self._has_type(subj, node_type)) + + def create( + self, + node_type: bsc.Node, + guids: typing.Iterable[URI], + ): + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check and create guids + for guid in guids: + subject = rdflib.URIRef(guid) + # check node existence + if (subject, rdflib.RDF.type, None) in self._graph: + # FIXME: node exists and may have a different type! ignore? raise? report? + continue + # add node + self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) + + def set( + self, + node_type: bsc.Node, + guids: typing.Iterable[URI], + predicate: bsc.Predicate, + values: typing.Iterable[typing.Any], + ): + # check node_type + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + # check predicate + if predicate not in self.schema.predicates(): + raise errors.ConsistencyError(f'{predicate} is not defined in the schema') + if not node_type <= predicate.domain: + raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') + # NOTE: predicate.range is in the schema since predicate is in the schema. + # materialize values + values = set(values) + # check values + if len(values) == 0: + return + if predicate.unique and len(values) != 1: + raise ValueError(values) + if isinstance(predicate.range, bsc.Node): + values = set(values) # materialize to safeguard against iterators passed as argument + inconsistent = {val for val in values if not self._has_type(val, predicate.range)} + # catches nodes that don't exist and nodes that have an inconsistent type + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + # check guids + # FIXME: Fail or skip inexistent nodes? + guids = set(guids) + inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} + if len(inconsistent) > 0: + raise errors.InstanceError(inconsistent) + + # add triples + pred = rdflib.URIRef(predicate.uri) + for guid, value in itertools.product(guids, values): + guid = rdflib.URIRef(guid) + # convert value + if isinstance(predicate.range, bsc.Literal): + value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) + elif isinstance(predicate.range, bsc.Node): + value = rdflib.URIRef(value) + else: + raise errors.UnreachableError() + # clear triples for unique predicates + if predicate.unique: + for obj in self._graph.objects(guid, pred): + if obj != value: + self._transaction.remove((guid, pred, obj)) + # add triple + self._transaction.add((guid, pred, value)) + +## EOF ## -- cgit v1.2.3