aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/triple_store/sparql
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/triple_store/sparql')
-rw-r--r--bsfs/triple_store/sparql/__init__.py13
-rw-r--r--bsfs/triple_store/sparql/distance.py51
-rw-r--r--bsfs/triple_store/sparql/parse_fetch.py104
-rw-r--r--bsfs/triple_store/sparql/parse_filter.py316
-rw-r--r--bsfs/triple_store/sparql/sparql.py354
-rw-r--r--bsfs/triple_store/sparql/utils.py137
6 files changed, 975 insertions, 0 deletions
diff --git a/bsfs/triple_store/sparql/__init__.py b/bsfs/triple_store/sparql/__init__.py
new file mode 100644
index 0000000..cfa2732
--- /dev/null
+++ b/bsfs/triple_store/sparql/__init__.py
@@ -0,0 +1,13 @@
+
+# imports
+import typing
+
+# inner-module imports
+from .sparql import SparqlStore
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'SparqlStore',
+ )
+
+## EOF ##
diff --git a/bsfs/triple_store/sparql/distance.py b/bsfs/triple_store/sparql/distance.py
new file mode 100644
index 0000000..2c2f355
--- /dev/null
+++ b/bsfs/triple_store/sparql/distance.py
@@ -0,0 +1,51 @@
+
+# standard imports
+import typing
+
+# external imports
+import numpy as np
+
+# bsfs imports
+from bsfs.namespace import ns
+
+# constants
+EPS = 1e-9
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'DISTANCE_FU',
+ )
+
+
+## code ##
+
+def euclid(fst, snd) -> float:
+ """Euclidean distance (l2 norm)."""
+ fst = np.array(fst)
+ snd = np.array(snd)
+ return float(np.linalg.norm(fst - snd))
+
+def cosine(fst, snd) -> float:
+ """Cosine distance."""
+ fst = np.array(fst)
+ snd = np.array(snd)
+ if (fst == snd).all():
+ return 0.0
+ nrm0 = np.linalg.norm(fst)
+ nrm1 = np.linalg.norm(snd)
+ return float(1.0 - np.dot(fst, snd) / (nrm0 * nrm1 + EPS))
+
+def manhatten(fst, snd) -> float:
+ """Manhatten (cityblock) distance (l1 norm)."""
+ fst = np.array(fst)
+ snd = np.array(snd)
+ return float(np.abs(fst - snd).sum())
+
+# Known distance functions.
+DISTANCE_FU = {
+ ns.bsd.euclidean: euclid,
+ ns.bsd.cosine: cosine,
+ ns.bsd.manhatten: manhatten,
+}
+
+## EOF ##
diff --git a/bsfs/triple_store/sparql/parse_fetch.py b/bsfs/triple_store/sparql/parse_fetch.py
new file mode 100644
index 0000000..fab8173
--- /dev/null
+++ b/bsfs/triple_store/sparql/parse_fetch.py
@@ -0,0 +1,104 @@
+
+# standard imports
+import typing
+
+# bsfs imports
+from bsfs import schema as bsc
+from bsfs.query import ast
+from bsfs.utils import errors
+
+# inner-module imports
+from .utils import GenHopName, Query
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Fetch',
+ )
+
+
+## code ##
+
+class Fetch():
+ """Translate `bsfs.query.ast.fetch` structures into Sparql queries."""
+
+ def __init__(self, schema):
+ self.schema = schema
+ self.ngen = GenHopName(prefix='?fch')
+
+ def __call__(
+ self,
+ root_type: bsc.Node,
+ root: ast.fetch.FetchExpression,
+ ) -> Query:
+ """
+ """
+ # check root_type
+ if not isinstance(root_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {root_type}')
+ if root_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'node {root_type} is not in the schema')
+ # parse root
+ terms, expr = self._parse_fetch_expression(root_type, root, '?ent')
+ # assemble query
+ return Query(
+ root_type=root_type.uri,
+ root_head='?ent',
+ select=terms,
+ where=expr,
+ )
+
+ def _parse_fetch_expression(
+ self,
+ node_type: bsc.Vertex,
+ node: ast.fetch.FetchExpression,
+ head: str,
+ ):
+ """Route *node* to the handler of the respective FetchExpression subclass."""
+ if isinstance(node, ast.fetch.All):
+ return self._all(node_type, node, head)
+ if isinstance(node, ast.fetch.Fetch):
+ return self._fetch(node_type, node, head)
+ if isinstance(node, ast.fetch.Node):
+ return self._node(node_type, node, head)
+ if isinstance(node, ast.fetch.Value):
+ return self._value(node_type, node, head)
+ if isinstance(node, ast.fetch.This):
+ return self._this(node_type, node, head)
+ # invalid node
+ raise errors.BackendError(f'expected fetch expression, found {node}')
+
+ def _all(self, node_type: bsc.Vertex, node: ast.fetch.All, head: str):
+ # child expressions
+ terms, exprs = zip(*[self._parse_fetch_expression(node_type, expr, head) for expr in node])
+ terms = {term for sub in terms for term in sub}
+ exprs = ' .\n'.join({expr for expr in exprs if len(expr.strip()) > 0})
+ return terms, exprs
+
+ def _fetch(self, node_type: bsc.Vertex, node: ast.fetch.Fetch, head: str): # pylint: disable=unused-argument # (node_type)
+ # child expressions
+ rng = self.schema.predicate(node.predicate).range
+ nexthead = next(self.ngen)
+ terms, expr = self._parse_fetch_expression(rng, node.expr, nexthead)
+ return terms, f'OPTIONAL{{ {head} <{node.predicate}> {nexthead} .\n {expr} }}'
+
+ def _node(self, node_type: bsc.Vertex, node: ast.fetch.Node, head: str): # pylint: disable=unused-argument # (node_type)
+ if f'?{node.name}'.startswith(self.ngen.prefix):
+ raise errors.BackendError(f'Node name must start with {self.ngen.prefix}')
+ # compose and return statement
+ term = next(self.ngen)
+ return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}'
+
+ def _value(self, node_type: bsc.Vertex, node: ast.fetch.Value, head: str): # pylint: disable=unused-argument # (node_type)
+ if f'?{node.name}'.startswith(self.ngen.prefix):
+ raise errors.BackendError(f'Value name must start with {self.ngen.prefix}')
+ # compose and return statement
+ term = next(self.ngen)
+ return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}'
+
+ def _this(self, node_type: bsc.Vertex, node: ast.fetch.This, head: str): # pylint: disable=unused-argument # (node_type)
+ if f'?{node.name}'.startswith(self.ngen.prefix):
+ raise errors.BackendError(f'This name must start with {self.ngen.prefix}')
+ # compose and return statement
+ return {(head, node.name)}, ''
+
+## EOF ##
diff --git a/bsfs/triple_store/sparql/parse_filter.py b/bsfs/triple_store/sparql/parse_filter.py
new file mode 100644
index 0000000..2f5a25b
--- /dev/null
+++ b/bsfs/triple_store/sparql/parse_filter.py
@@ -0,0 +1,316 @@
+
+# imports
+import operator
+import typing
+
+# external imports
+import rdflib
+
+# bsfs imports
+from bsfs import schema as bsc
+from bsfs.namespace import ns
+from bsfs.query import ast
+from bsfs.utils import URI, errors
+
+# inner-module imports
+from .distance import DISTANCE_FU
+from .utils import GenHopName, Query
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Filter',
+ )
+
+
+## code ##
+
+class Filter():
+ """Translate `bsfs.query.ast.filter` structures into Sparql queries."""
+
+ # Current schema to validate against.
+ schema: bsc.Schema
+
+ # Generator that produces unique symbol names.
+ ngen: GenHopName
+
+ def __init__(self, graph, schema):
+ self.graph = graph
+ self.schema = schema
+ self.ngen = GenHopName(prefix='?flt')
+
+ def __call__(
+ self,
+ root_type: bsc.Node,
+ root: typing.Optional[ast.filter.FilterExpression] = None,
+ ) -> Query:
+ """
+ """
+ # check root_type
+ if not isinstance(root_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {root_type}')
+ if root_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'node {root_type} is not in the schema')
+ # parse root
+ if root is None:
+ cond = ''
+ else:
+ cond = self._parse_filter_expression(root_type, root, '?ent')
+ # assemble query
+ return Query(
+ root_type=root_type.uri,
+ root_head='?ent',
+ where=cond,
+ )
+
+ def _parse_filter_expression(
+ self,
+ type_: bsc.Vertex,
+ node: ast.filter.FilterExpression,
+ head: str,
+ ) -> str:
+ """Route *node* to the handler of the respective FilterExpression subclass."""
+ if isinstance(node, ast.filter.Is):
+ return self._is(type_, node, head)
+ if isinstance(node, ast.filter.Not):
+ return self._not(type_, node, head)
+ if isinstance(node, ast.filter.Has):
+ return self._has(type_, node, head)
+ if isinstance(node, ast.filter.Distance):
+ return self._distance(type_, node, head)
+ if isinstance(node, ast.filter.Any):
+ return self._any(type_, node, head)
+ if isinstance(node, ast.filter.All):
+ return self._all(type_, node, head)
+ if isinstance(node, ast.filter.And):
+ return self._and(type_, node, head)
+ if isinstance(node, ast.filter.Or):
+ return self._or(type_, node, head)
+ if isinstance(node, ast.filter.Equals):
+ return self._equals(type_, node, head)
+ if isinstance(node, ast.filter.Substring):
+ return self._substring(type_, node, head)
+ if isinstance(node, ast.filter.StartsWith):
+ return self._starts_with(type_, node, head)
+ if isinstance(node, ast.filter.EndsWith):
+ return self._ends_with(type_, node, head)
+ if isinstance(node, ast.filter.LessThan):
+ return self._less_than(type_, node, head)
+ if isinstance(node, ast.filter.GreaterThan):
+ return self._greater_than(type_, node, head)
+ # invalid node
+ raise errors.BackendError(f'expected filter expression, found {node}')
+
+ def _parse_predicate_expression(
+ self,
+ type_: bsc.Vertex,
+ node: ast.filter.PredicateExpression
+ ) -> typing.Tuple[str, bsc.Vertex]:
+ """Route *node* to the handler of the respective PredicateExpression subclass."""
+ if isinstance(node, ast.filter.Predicate):
+ return self._predicate(type_, node)
+ if isinstance(node, ast.filter.OneOf):
+ return self._one_of(type_, node)
+ # invalid node
+ raise errors.BackendError(f'expected predicate expression, found {node}')
+
+ def _one_of(self, node_type: bsc.Vertex, node: ast.filter.OneOf) -> typing.Tuple[str, bsc.Vertex]:
+ """
+ """
+ if not isinstance(node_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {node_type}')
+ # walk through predicates
+ suburi, rng = set(), None
+ for pred in node: # OneOf guarantees at least one expression
+ puri, subrng = self._parse_predicate_expression(node_type, pred)
+ # track predicate uris
+ suburi.add(puri)
+ # check for more generic range
+ if rng is None or subrng > rng:
+ rng = subrng
+ # check range consistency
+ if not subrng <= rng and not subrng >= rng:
+ raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related')
+ # return joint predicate expression and next range
+ # OneOf guarantees at least one expression, rng is always a bsc.Vertex.
+ # mypy does not realize this, hence we ignore the warning.
+ return '|'.join(suburi), rng # type: ignore [return-value]
+
+ def _predicate(self, node_type: bsc.Vertex, node: ast.filter.Predicate) -> typing.Tuple[str, bsc.Vertex]:
+ """
+ """
+ # check node_type
+ if not isinstance(node_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {node_type}')
+ # fetch predicate and its uri
+ puri = node.predicate
+ # get and check predicate, domain, and range
+ if not self.schema.has_predicate(puri):
+ raise errors.ConsistencyError(f'predicate {puri} is not in the schema')
+ pred = self.schema.predicate(puri)
+ if not isinstance(pred.range, (bsc.Node, bsc.Literal)):
+ raise errors.BackendError(f'the range of predicate {pred} is undefined')
+ dom, rng = pred.domain, pred.range
+ # encapsulate predicate uri
+ uri_str = f'<{puri}>'
+ # apply reverse flag
+ if node.reverse:
+ uri_str = '^' + uri_str
+ dom, rng = rng, dom # type: ignore [assignment] # variable re-use confuses mypy
+ # check path consistency
+ if not node_type <= dom:
+ raise errors.ConsistencyError(f'expected type {dom} or subtype thereof, found {node_type}')
+ # return predicate URI and next node type
+ return uri_str, rng
+
+ def _any(self, node_type: bsc.Vertex, node: ast.filter.Any, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {node_type}')
+ # parse predicate
+ pred, next_type = self._parse_predicate_expression(node_type, node.predicate)
+ # parse expression
+ nexthead = next(self.ngen)
+ expr = self._parse_filter_expression(next_type, node.expr, nexthead)
+ # combine results
+ return f'{head} {pred} {nexthead} . {expr}'
+
+ def _all(self, node_type: bsc.Vertex, node: ast.filter.All, head: str) -> str:
+ """
+ """
+ # NOTE: All(P, E) := Not(Any(P, Not(E))) and EXISTS(P, ?)
+ if not isinstance(node_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {node_type}')
+ # parse rewritten ast
+ expr = self._parse_filter_expression(node_type,
+ ast.filter.Not(
+ ast.filter.Any(node.predicate,
+ ast.filter.Not(node.expr))), head)
+ # parse predicate for existence constraint
+ pred, _ = self._parse_predicate_expression(node_type, node.predicate)
+ temphead = next(self.ngen)
+ # return existence and rewritten expression
+ return f'FILTER EXISTS {{ {head} {pred} {temphead} }} . ' + expr
+
+ def _and(self, node_type: bsc.Vertex, node: ast.filter.And, head: str) -> str:
+ """
+ """
+ sub = [self._parse_filter_expression(node_type, expr, head) for expr in node]
+ return ' . '.join(sub)
+
+ def _or(self, node_type: bsc.Vertex, node: ast.filter.Or, head: str) -> str:
+ """
+ """
+ # potential special case optimization:
+ # * ast: Or(Equals('foo'), Equals('bar'), ...)
+ # * query: VALUES ?head { "value1"^^<...> "value2"^^<...> "value3"^<...> ... }
+ sub = [self._parse_filter_expression(node_type, expr, head) for expr in node]
+ sub = ['{' + expr + '}' for expr in sub]
+ return ' UNION '.join(sub)
+
+ def _not(self, node_type: bsc.Vertex, node: ast.filter.Not, head: str) -> str:
+ """
+ """
+ expr = self._parse_filter_expression(node_type, node.expr, head)
+ if isinstance(node_type, bsc.Literal):
+ return f'MINUS {{ {expr} }}'
+ # NOTE: for bsc.Node types, we must include at least one expression in the body of MINUS,
+ # otherwise the connection between the context and body of MINUS is lost.
+ # The simplest (and non-interfering) choice is a type statement.
+ return f'MINUS {{ {head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{node_type.uri}> . {expr} }}'
+
+ def _has(self, node_type: bsc.Vertex, node: ast.filter.Has, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {node_type}')
+ # parse predicate
+ pred, _ = self._parse_predicate_expression(node_type, node.predicate)
+ # get new heads
+ inner = next(self.ngen)
+ outer = next(self.ngen)
+ # predicate count expression (fetch number of predicates at *head*)
+ num_preds = f'{{ SELECT (COUNT(distinct {inner}) as {outer}) WHERE {{ {head} {pred} {inner} }} }}'
+ # count expression
+ count_bounds = self._parse_filter_expression(self.schema.literal(ns.xsd.integer), node.count, outer)
+ # combine
+ return num_preds + ' . ' + count_bounds
+
+ def _distance(self, node_type: bsc.Vertex, node: ast.filter.Distance, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Feature):
+ raise errors.BackendError(f'expected Feature, found {node_type}')
+ if len(node.reference) != node_type.dimension:
+ raise errors.ConsistencyError(
+ f'reference has dimension {len(node.reference)}, expected {node_type.dimension}')
+ # get distance metric
+ dist = DISTANCE_FU[node_type.distance]
+ # get operator
+ cmp = operator.lt if node.strict else operator.le
+ # get candidate values
+ candidates = {
+ f'"{cand}"^^<{node_type.uri}>'
+ for cand
+ in self.graph.objects()
+ if isinstance(cand, rdflib.Literal)
+ and cand.datatype == rdflib.URIRef(node_type.uri)
+ and cmp(dist(cand.value, node.reference), node.threshold)
+ }
+ # combine candidate values
+ values = ' '.join(candidates) if len(candidates) else f'"impossible value"^^<{ns.xsd.string}>'
+ # return sparql fragment
+ return f'VALUES {head} {{ {values} }}'
+
+ def _is(self, node_type: bsc.Vertex, node: ast.filter.Is, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {node_type}')
+ return f'VALUES {head} {{ <{URI(node.value)}> }}'
+
+ def _equals(self, node_type: bsc.Vertex, node: ast.filter.Equals, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Literal):
+ raise errors.BackendError(f'expected Literal, found {node}')
+ return f'VALUES {head} {{ "{node.value}"^^<{node_type.uri}> }}'
+
+ def _substring(self, node_type: bsc.Vertex, node: ast.filter.Substring, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Literal):
+ raise errors.BackendError(f'expected Literal, found {node_type}')
+ return f'FILTER contains(str({head}), "{node.value}")'
+
+ def _starts_with(self, node_type: bsc.Vertex, node: ast.filter.StartsWith, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Literal):
+ raise errors.BackendError(f'expected Literal, found {node_type}')
+ return f'FILTER strstarts(str({head}), "{node.value}")'
+
+ def _ends_with(self, node_type: bsc.Vertex, node: ast.filter.EndsWith, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Literal):
+ raise errors.BackendError(f'expected Literal, found {node_type}')
+ return f'FILTER strends(str({head}), "{node.value}")'
+
+ def _less_than(self, node_type: bsc.Vertex, node: ast.filter.LessThan, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Literal):
+ raise errors.BackendError(f'expected Literal, found {node_type}')
+ equality = '=' if not node.strict else ''
+ return f'FILTER ({head} <{equality} {float(node.threshold)})'
+
+ def _greater_than(self, node_type: bsc.Vertex, node: ast.filter.GreaterThan, head: str) -> str:
+ """
+ """
+ if not isinstance(node_type, bsc.Literal):
+ raise errors.BackendError(f'expected Literal, found {node_type}')
+ equality = '=' if not node.strict else ''
+ return f'FILTER ({head} >{equality} {float(node.threshold)})'
+
+## EOF ##
diff --git a/bsfs/triple_store/sparql/sparql.py b/bsfs/triple_store/sparql/sparql.py
new file mode 100644
index 0000000..99e67d6
--- /dev/null
+++ b/bsfs/triple_store/sparql/sparql.py
@@ -0,0 +1,354 @@
+
+# imports
+import base64
+import itertools
+import typing
+
+# external imports
+import rdflib
+
+# bsfs imports
+from bsfs import schema as bsc
+from bsfs.namespace import ns
+from bsfs.query import ast
+from bsfs.utils import errors, URI
+
+# inner-module imports
+from . import parse_fetch
+from . import parse_filter
+from .. import base
+from .distance import DISTANCE_FU
+
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'SparqlStore',
+ )
+
+
+## code ##
+
+rdflib.term.bind(ns.bsl.BinaryBlob, bytes, constructor=base64.b64decode)
+
+class _Transaction():
+ """Lightweight rdflib transactions for in-memory databases."""
+
+ # graph instance.
+ _graph: rdflib.Graph
+
+ # current log of added triples.
+ _added: typing.List[typing.Any]
+
+ # current log of removed triples.
+ _removed: typing.List[typing.Any]
+
+ def __init__(self, graph: rdflib.Graph):
+ self._graph = graph
+ # initialize internal structures
+ self.commit()
+
+ def commit(self):
+ """Commit temporary changes."""
+ self._added = []
+ self._removed = []
+
+ def rollback(self):
+ """Undo changes since the last commit."""
+ for triple in self._added:
+ self._graph.remove(triple)
+ for triple in self._removed:
+ self._graph.add(triple)
+
+ def add(self, triple: typing.Any):
+ """Add a triple to the graph."""
+ if triple not in self._graph:
+ self._added.append(triple)
+ self._graph.add(triple)
+
+ def remove(self, triple: typing.Any):
+ """Remove a triple from the graph."""
+ if triple in self._graph:
+ self._removed.append(triple)
+ self._graph.remove(triple)
+
+
+class SparqlStore(base.TripleStoreBase):
+ """Sparql-based triple store.
+
+ The sparql triple store uses a third-party backend
+ (currently rdflib) to store triples and manages them via
+ the Sparql query language.
+
+ """
+
+ # The rdflib graph.
+ _graph: rdflib.Graph
+
+ # Current transaction.
+ _transaction: _Transaction
+
+ # The local schema.
+ _schema: bsc.Schema
+
+ # Filter parser
+ _filter_parser: parse_filter.Filter
+
+ # Fetch parser
+ _fetch_parser: parse_fetch.Fetch
+
+ def __init__(self):
+ super().__init__(None)
+ self._graph = rdflib.Graph()
+ self._transaction = _Transaction(self._graph)
+ self._schema = bsc.Schema(literals={bsc.ROOT_NUMBER.child(ns.xsd.integer)})
+ self._filter_parser = parse_filter.Filter(self._graph, self._schema)
+ self._fetch_parser = parse_fetch.Fetch(self._schema)
+
+ # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super)
+ # However, not having it here is clearer since it's explicit that there are no arguments.
+ @classmethod
+ def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ
+ return cls()
+
+ def commit(self):
+ self._transaction.commit()
+
+ def rollback(self):
+ self._transaction.rollback()
+
+ @property
+ def schema(self) -> bsc.Schema:
+ return self._schema
+
+ @schema.setter
+ def schema(self, schema: bsc.Schema):
+ # check args: Schema instanace
+ if not isinstance(schema, bsc.Schema):
+ raise TypeError(schema)
+ # check compatibility: No contradicting definitions
+ if not self.schema.consistent_with(schema):
+ raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}')
+ # check distance functions of features
+ invalid = {
+ (cand.uri, cand.distance)
+ for cand
+ in schema.literals()
+ if isinstance(cand, bsc.Feature) and cand.distance not in DISTANCE_FU}
+ if len(invalid) > 0:
+ cand, dist = zip(*invalid)
+ raise errors.UnsupportedError(
+ f'unknown distance function {",".join(dist)} in feature {", ".join(cand)}')
+
+ # commit the current transaction
+ self.commit()
+
+ # adjust instances:
+ # nothing to do for added classes
+ # delete instances of removed classes
+
+ # get deleted classes
+ sub = self.schema - schema
+
+ for pred in sub.predicates:
+ # remove predicate instances
+ for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)):
+ self._transaction.remove((src, rdflib.URIRef(pred.uri), trg))
+ # remove predicate definition
+ if pred.parent is not None: # NOTE: there shouldn't be any predicate w/o parent
+ self._transaction.remove((
+ rdflib.URIRef(pred.uri),
+ rdflib.RDFS.subClassOf,
+ rdflib.URIRef(pred.parent.uri),
+ ))
+
+ # remove node instances
+ for node in sub.nodes:
+ # iterate through node instances
+ for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)):
+ # remove triples where the instance is in the object position
+ for src, pred in self._graph.subject_predicates(inst):
+ self._transaction.remove((src, pred, inst))
+ # remove triples where the instance is in the subject position
+ for pred, trg in self._graph.predicate_objects(inst):
+ self._transaction.remove((inst, pred, trg))
+ # remove instance
+ self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri)))
+ # remove node definition
+ if node.parent is not None: # NOTE: there shouldn't be any node w/o parent
+ self._transaction.remove((
+ rdflib.URIRef(node.uri),
+ rdflib.RDFS.subClassOf,
+ rdflib.URIRef(node.parent.uri),
+ ))
+
+ for lit in sub.literals:
+ # remove literal definition
+ if lit.parent is not None: # NOTE: there shouldn't be any literal w/o parent
+ self._transaction.remove((
+ rdflib.URIRef(lit.uri),
+ rdflib.RDFS.subClassOf,
+ rdflib.URIRef(lit.parent.uri),
+ ))
+
+ # add predicate, node, and literal hierarchies to the graph
+ for itm in itertools.chain(schema.predicates(), schema.nodes(), schema.literals()):
+ if itm.parent is not None:
+ self._transaction.add((rdflib.URIRef(itm.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(itm.parent.uri)))
+
+ # commit instance changes
+ self.commit()
+
+ # migrate schema
+ self._schema = schema
+ self._filter_parser.schema = schema
+ self._fetch_parser.schema = schema
+
+ def fetch(
+ self,
+ node_type: bsc.Node,
+ filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin
+ fetch: ast.fetch.FetchExpression,
+ ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]:
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ if not isinstance(filter, ast.filter.FilterExpression):
+ raise TypeError(filter)
+ if not isinstance(fetch, ast.fetch.FetchExpression):
+ raise TypeError(fetch)
+ # compose a query from fetch and filter ast
+ query = self._filter_parser(node_type, filter)
+ query += self._fetch_parser(node_type, fetch)
+ # run query
+ emitted = set()
+ for result in query(self._graph):
+ guid = URI(result[0])
+ for name, raw in zip(query.names, result[1:]):
+ if raw is None: # undefined value
+ continue
+ if isinstance(raw, rdflib.Literal):
+ value = raw.value
+ else:
+ value = URI(raw)
+ # emit triple
+ triple = (guid, name, value)
+ if triple not in emitted: # FIXME: needs a better solution!
+ emitted.add(triple)
+ yield guid, name, value
+
+ def get(
+ self,
+ node_type: bsc.Node,
+ filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin
+ ) -> typing.Iterator[URI]:
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ if filter is not None and not isinstance(filter, ast.filter.FilterExpression):
+ raise TypeError(filter)
+ # compose query
+ query = self._filter_parser(node_type, filter)
+ # run query
+ for guid, in query(self._graph):
+ yield URI(guid)
+
+ def _has_type(self, subject: URI, node_type: bsc.Node) -> bool:
+ """Return True if *subject* is a node of class *node_type* or a subclass thereof."""
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+
+ subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type))
+ if len(subject_types) == 0:
+ return False
+ if len(subject_types) == 1:
+ node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str
+ if node == node_type:
+ return True
+ if node_type in node.parents():
+ return True
+ return False
+ raise errors.UnreachableError()
+
+ def exists(
+ self,
+ node_type: bsc.Node,
+ guids: typing.Iterable[URI],
+ ) -> typing.Iterable[URI]:
+ return (subj for subj in guids if self._has_type(subj, node_type))
+
+ def create(
+ self,
+ node_type: bsc.Node,
+ guids: typing.Iterable[URI],
+ ):
+ # check node_type
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ # check and create guids
+ for guid in guids:
+ subject = rdflib.URIRef(URI(guid))
+ # check node existence
+ if (subject, rdflib.RDF.type, None) in self._graph:
+ # FIXME: node exists and may have a different type! ignore? raise? report?
+ continue
+ # add node
+ self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri)))
+
+ def set(
+ self,
+ node_type: bsc.Node,
+ guids: typing.Iterable[URI],
+ predicate: bsc.Predicate,
+ values: typing.Iterable[typing.Any],
+ ):
+ # check node_type
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ # check predicate
+ if predicate not in self.schema.predicates():
+ raise errors.ConsistencyError(f'{predicate} is not defined in the schema')
+ if not node_type <= predicate.domain:
+ raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}')
+ # NOTE: predicate.range is in the schema since predicate is in the schema.
+ # materialize values
+ values = set(values)
+ # check values
+ if len(values) == 0:
+ return
+ if predicate.unique and len(values) != 1:
+ raise ValueError(values)
+ if isinstance(predicate.range, bsc.Node):
+ values = set(values) # materialize to safeguard against iterators passed as argument
+ inconsistent = {val for val in values if not self._has_type(val, predicate.range)}
+ # catches nodes that don't exist and nodes that have an inconsistent type
+ if len(inconsistent) > 0:
+ raise errors.InstanceError(inconsistent)
+ # check guids
+ # FIXME: Fail or skip inexistent nodes?
+ guids = {URI(guid) for guid in guids}
+ inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)}
+ if len(inconsistent) > 0:
+ raise errors.InstanceError(inconsistent)
+
+ # add triples
+ pred = rdflib.URIRef(predicate.uri)
+ for guid, value in itertools.product(guids, values):
+ guid = rdflib.URIRef(guid)
+ # convert value
+ if isinstance(predicate.range, bsc.Literal):
+ dtype = rdflib.URIRef(predicate.range.uri)
+ if predicate.range <= self.schema.literal(ns.bsl.BinaryBlob):
+ dtype = rdflib.URIRef(ns.bsl.BinaryBlob)
+ value = base64.b64encode(value)
+ value = rdflib.Literal(value, datatype=dtype)
+ elif isinstance(predicate.range, bsc.Node):
+ value = rdflib.URIRef(value)
+ else:
+ raise errors.UnreachableError()
+ # clear triples for unique predicates
+ if predicate.unique:
+ for obj in self._graph.objects(guid, pred):
+ if obj != value:
+ self._transaction.remove((guid, pred, obj))
+ # add triple
+ self._transaction.add((guid, pred, value))
+
+## EOF ##
diff --git a/bsfs/triple_store/sparql/utils.py b/bsfs/triple_store/sparql/utils.py
new file mode 100644
index 0000000..38062c2
--- /dev/null
+++ b/bsfs/triple_store/sparql/utils.py
@@ -0,0 +1,137 @@
+
+# standard imports
+import typing
+
+# external imports
+import rdflib
+
+# bsfs imports
+from bsfs.namespace import ns
+from bsfs.utils import typename
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'GenHopName',
+ 'Query',
+ )
+
+
+## code ##
+
+class GenHopName():
+ """Generator that produces a new unique symbol name with each iteration."""
+
+ # Symbol name prefix.
+ prefix: str
+
+ # Current counter.
+ curr: int
+
+ def __init__(self, prefix: str = '?hop', start: int = 0):
+ self.prefix = prefix
+ self.curr = start - 1
+
+ def __next__(self):
+ """Generate and return the next unique name."""
+ self.curr += 1
+ return self.prefix + str(self.curr)
+
+
+class Query():
+ """Hold, manage, and complete partial Sparql queries."""
+
+ # root node type URI.
+ root_type: str
+
+ # root node variable name.
+ root_head: str
+
+ # (head, name) tuples (w/o root)
+ select: typing.Tuple[typing.Tuple[str, str], ...]
+
+ # where statements.
+ where: str
+
+ def __init__(
+ self,
+ root_type: str,
+ root_head: str = '?ent',
+ select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None,
+ where: typing.Optional[str] = None,
+ ):
+ # check arguments
+ if select is None:
+ select = []
+ if where is None:
+ where = ''
+ # set members
+ self.root_type = root_type
+ self.root_head = root_head
+ self.select = tuple(select) # tuple ensures presistent order
+ self.where = where.strip()
+
+ def __str__(self) -> str:
+ return self.query
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self.root_type == other.root_type \
+ and self.root_head == other.root_head \
+ and self.select == other.select \
+ and self.where == other.where
+
+ def __hash__(self) -> int:
+ return hash((type(self), self.root_type, self.root_head, self.select, self.where))
+
+ def __add__(self, other: typing.Any) -> 'Query':
+ # check other's type
+ if not isinstance(other, type(self)):
+ return NotImplemented
+ # check query compatibility
+ if not self.root_type == other.root_type:
+ raise ValueError(other)
+ if not self.root_head == other.root_head:
+ raise ValueError(other)
+ # combine selections
+ select = self.select + other.select
+ # combine conditions
+ conds = []
+ if self.where != '':
+ conds.append(self.where)
+ if other.where != '':
+ conds.append(other.where)
+ where = ' . '.join(conds)
+ # return new query
+ return Query(
+ root_type=self.root_type,
+ root_head=self.root_head,
+ select=select,
+ where=where,
+ )
+
+ @property
+ def names(self) -> typing.Tuple[str, ...]:
+ """Return a tuple of selected variable names, excluding the root."""
+ return tuple(name for _, name in self.select)
+
+ @property
+ def query(self) -> str:
+ """Return an executable sparql query."""
+ select = ' '.join(f'({head} as ?{name})' for head, name in self.select)
+ return f'''
+ SELECT DISTINCT {self.root_head} {select}
+ WHERE {{
+ {self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> .
+ {self.where}
+ }}
+ ORDER BY str({self.root_head})
+ '''
+
+ def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result:
+ """Execute the query on a *graph* and return the query result."""
+ return graph.query(self.query)
+
+## EOF ##