diff options
Diffstat (limited to 'bsfs/triple_store')
-rw-r--r-- | bsfs/triple_store/__init__.py | 5 | ||||
-rw-r--r-- | bsfs/triple_store/base.py | 41 | ||||
-rw-r--r-- | bsfs/triple_store/sparql/__init__.py | 13 | ||||
-rw-r--r-- | bsfs/triple_store/sparql/distance.py | 51 | ||||
-rw-r--r-- | bsfs/triple_store/sparql/parse_fetch.py | 104 | ||||
-rw-r--r-- | bsfs/triple_store/sparql/parse_filter.py | 316 | ||||
-rw-r--r-- | bsfs/triple_store/sparql/sparql.py (renamed from bsfs/triple_store/sparql.py) | 127 | ||||
-rw-r--r-- | bsfs/triple_store/sparql/utils.py | 137 |
8 files changed, 764 insertions, 30 deletions
diff --git a/bsfs/triple_store/__init__.py b/bsfs/triple_store/__init__.py index fb5a8a9..79a2887 100644 --- a/bsfs/triple_store/__init__.py +++ b/bsfs/triple_store/__init__.py @@ -1,9 +1,4 @@ -""" -Part of the BlackStar filesystem (bsfs) module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" # imports import typing diff --git a/bsfs/triple_store/base.py b/bsfs/triple_store/base.py index 6561262..58b5670 100644 --- a/bsfs/triple_store/base.py +++ b/bsfs/triple_store/base.py @@ -1,16 +1,12 @@ -""" -Part of the BlackStar filesystem (bsfs) module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" # imports import abc import typing # inner-module imports +from bsfs.query import ast from bsfs.utils import URI, typename -import bsfs.schema as _schema +import bsfs.schema as bsc # exports __all__: typing.Sequence[str] = ( @@ -81,12 +77,12 @@ class TripleStoreBase(abc.ABC): @property @abc.abstractmethod - def schema(self) -> _schema.Schema: + def schema(self) -> bsc.Schema: """Return the store's local schema.""" @schema.setter @abc.abstractmethod - def schema(self, schema: _schema.Schema): + def schema(self, schema: bsc.Schema): """Migrate to new schema by adding or removing class definitions. Commits before and after the migration. @@ -109,9 +105,30 @@ class TripleStoreBase(abc.ABC): """ @abc.abstractmethod + def get( + self, + node_type: bsc.Node, + filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin + ) -> typing.Iterator[URI]: + """Return guids of nodes of type *node_type* that match the *filter*. + Return all guids of the respective type if *filter* is None. + """ + + @abc.abstractmethod + def fetch( + self, + node_type: bsc.Node, + filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin + fetch: ast.fetch.FetchExpression, + ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]: + """Return (guid, name, value) triples where the guid is determined by the *filter* + query and the name matches the *fetch* query. + """ + + @abc.abstractmethod def exists( self, - node_type: _schema.Node, + node_type: bsc.Node, guids: typing.Iterable[URI], ) -> typing.Iterable[URI]: """Return those *guids* that exist and have type *node_type* or a subclass thereof.""" @@ -119,7 +136,7 @@ class TripleStoreBase(abc.ABC): @abc.abstractmethod def create( self, - node_type: _schema.Node, + node_type: bsc.Node, guids: typing.Iterable[URI], ): """Create *guid* nodes with type *subject*.""" @@ -127,9 +144,9 @@ class TripleStoreBase(abc.ABC): @abc.abstractmethod def set( self, - node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + node_type: bsc.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? guids: typing.Iterable[URI], - predicate: _schema.Predicate, + predicate: bsc.Predicate, values: typing.Iterable[typing.Any], ): """Add triples to the graph. diff --git a/bsfs/triple_store/sparql/__init__.py b/bsfs/triple_store/sparql/__init__.py new file mode 100644 index 0000000..cfa2732 --- /dev/null +++ b/bsfs/triple_store/sparql/__init__.py @@ -0,0 +1,13 @@ + +# imports +import typing + +# inner-module imports +from .sparql import SparqlStore + +# exports +__all__: typing.Sequence[str] = ( + 'SparqlStore', + ) + +## EOF ## diff --git a/bsfs/triple_store/sparql/distance.py b/bsfs/triple_store/sparql/distance.py new file mode 100644 index 0000000..2c2f355 --- /dev/null +++ b/bsfs/triple_store/sparql/distance.py @@ -0,0 +1,51 @@ + +# standard imports +import typing + +# external imports +import numpy as np + +# bsfs imports +from bsfs.namespace import ns + +# constants +EPS = 1e-9 + +# exports +__all__: typing.Sequence[str] = ( + 'DISTANCE_FU', + ) + + +## code ## + +def euclid(fst, snd) -> float: + """Euclidean distance (l2 norm).""" + fst = np.array(fst) + snd = np.array(snd) + return float(np.linalg.norm(fst - snd)) + +def cosine(fst, snd) -> float: + """Cosine distance.""" + fst = np.array(fst) + snd = np.array(snd) + if (fst == snd).all(): + return 0.0 + nrm0 = np.linalg.norm(fst) + nrm1 = np.linalg.norm(snd) + return float(1.0 - np.dot(fst, snd) / (nrm0 * nrm1 + EPS)) + +def manhatten(fst, snd) -> float: + """Manhatten (cityblock) distance (l1 norm).""" + fst = np.array(fst) + snd = np.array(snd) + return float(np.abs(fst - snd).sum()) + +# Known distance functions. +DISTANCE_FU = { + ns.bsd.euclidean: euclid, + ns.bsd.cosine: cosine, + ns.bsd.manhatten: manhatten, +} + +## EOF ## diff --git a/bsfs/triple_store/sparql/parse_fetch.py b/bsfs/triple_store/sparql/parse_fetch.py new file mode 100644 index 0000000..fab8173 --- /dev/null +++ b/bsfs/triple_store/sparql/parse_fetch.py @@ -0,0 +1,104 @@ + +# standard imports +import typing + +# bsfs imports +from bsfs import schema as bsc +from bsfs.query import ast +from bsfs.utils import errors + +# inner-module imports +from .utils import GenHopName, Query + +# exports +__all__: typing.Sequence[str] = ( + 'Fetch', + ) + + +## code ## + +class Fetch(): + """Translate `bsfs.query.ast.fetch` structures into Sparql queries.""" + + def __init__(self, schema): + self.schema = schema + self.ngen = GenHopName(prefix='?fch') + + def __call__( + self, + root_type: bsc.Node, + root: ast.fetch.FetchExpression, + ) -> Query: + """ + """ + # check root_type + if not isinstance(root_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {root_type}') + if root_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'node {root_type} is not in the schema') + # parse root + terms, expr = self._parse_fetch_expression(root_type, root, '?ent') + # assemble query + return Query( + root_type=root_type.uri, + root_head='?ent', + select=terms, + where=expr, + ) + + def _parse_fetch_expression( + self, + node_type: bsc.Vertex, + node: ast.fetch.FetchExpression, + head: str, + ): + """Route *node* to the handler of the respective FetchExpression subclass.""" + if isinstance(node, ast.fetch.All): + return self._all(node_type, node, head) + if isinstance(node, ast.fetch.Fetch): + return self._fetch(node_type, node, head) + if isinstance(node, ast.fetch.Node): + return self._node(node_type, node, head) + if isinstance(node, ast.fetch.Value): + return self._value(node_type, node, head) + if isinstance(node, ast.fetch.This): + return self._this(node_type, node, head) + # invalid node + raise errors.BackendError(f'expected fetch expression, found {node}') + + def _all(self, node_type: bsc.Vertex, node: ast.fetch.All, head: str): + # child expressions + terms, exprs = zip(*[self._parse_fetch_expression(node_type, expr, head) for expr in node]) + terms = {term for sub in terms for term in sub} + exprs = ' .\n'.join({expr for expr in exprs if len(expr.strip()) > 0}) + return terms, exprs + + def _fetch(self, node_type: bsc.Vertex, node: ast.fetch.Fetch, head: str): # pylint: disable=unused-argument # (node_type) + # child expressions + rng = self.schema.predicate(node.predicate).range + nexthead = next(self.ngen) + terms, expr = self._parse_fetch_expression(rng, node.expr, nexthead) + return terms, f'OPTIONAL{{ {head} <{node.predicate}> {nexthead} .\n {expr} }}' + + def _node(self, node_type: bsc.Vertex, node: ast.fetch.Node, head: str): # pylint: disable=unused-argument # (node_type) + if f'?{node.name}'.startswith(self.ngen.prefix): + raise errors.BackendError(f'Node name must start with {self.ngen.prefix}') + # compose and return statement + term = next(self.ngen) + return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}' + + def _value(self, node_type: bsc.Vertex, node: ast.fetch.Value, head: str): # pylint: disable=unused-argument # (node_type) + if f'?{node.name}'.startswith(self.ngen.prefix): + raise errors.BackendError(f'Value name must start with {self.ngen.prefix}') + # compose and return statement + term = next(self.ngen) + return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}' + + def _this(self, node_type: bsc.Vertex, node: ast.fetch.This, head: str): # pylint: disable=unused-argument # (node_type) + if f'?{node.name}'.startswith(self.ngen.prefix): + raise errors.BackendError(f'This name must start with {self.ngen.prefix}') + # compose and return statement + return {(head, node.name)}, '' + +## EOF ## diff --git a/bsfs/triple_store/sparql/parse_filter.py b/bsfs/triple_store/sparql/parse_filter.py new file mode 100644 index 0000000..2f5a25b --- /dev/null +++ b/bsfs/triple_store/sparql/parse_filter.py @@ -0,0 +1,316 @@ + +# imports +import operator +import typing + +# external imports +import rdflib + +# bsfs imports +from bsfs import schema as bsc +from bsfs.namespace import ns +from bsfs.query import ast +from bsfs.utils import URI, errors + +# inner-module imports +from .distance import DISTANCE_FU +from .utils import GenHopName, Query + +# exports +__all__: typing.Sequence[str] = ( + 'Filter', + ) + + +## code ## + +class Filter(): + """Translate `bsfs.query.ast.filter` structures into Sparql queries.""" + + # Current schema to validate against. + schema: bsc.Schema + + # Generator that produces unique symbol names. + ngen: GenHopName + + def __init__(self, graph, schema): + self.graph = graph + self.schema = schema + self.ngen = GenHopName(prefix='?flt') + + def __call__( + self, + root_type: bsc.Node, + root: typing.Optional[ast.filter.FilterExpression] = None, + ) -> Query: + """ + """ + # check root_type + if not isinstance(root_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {root_type}') + if root_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'node {root_type} is not in the schema') + # parse root + if root is None: + cond = '' + else: + cond = self._parse_filter_expression(root_type, root, '?ent') + # assemble query + return Query( + root_type=root_type.uri, + root_head='?ent', + where=cond, + ) + + def _parse_filter_expression( + self, + type_: bsc.Vertex, + node: ast.filter.FilterExpression, + head: str, + ) -> str: + """Route *node* to the handler of the respective FilterExpression subclass.""" + if isinstance(node, ast.filter.Is): + return self._is(type_, node, head) + if isinstance(node, ast.filter.Not): + return self._not(type_, node, head) + if isinstance(node, ast.filter.Has): + return self._has(type_, node, head) + if isinstance(node, ast.filter.Distance): + return self._distance(type_, node, head) + if isinstance(node, ast.filter.Any): + return self._any(type_, node, head) + if isinstance(node, ast.filter.All): + return self._all(type_, node, head) + if isinstance(node, ast.filter.And): + return self._and(type_, node, head) + if isinstance(node, ast.filter.Or): + return self._or(type_, node, head) + if isinstance(node, ast.filter.Equals): + return self._equals(type_, node, head) + if isinstance(node, ast.filter.Substring): + return self._substring(type_, node, head) + if isinstance(node, ast.filter.StartsWith): + return self._starts_with(type_, node, head) + if isinstance(node, ast.filter.EndsWith): + return self._ends_with(type_, node, head) + if isinstance(node, ast.filter.LessThan): + return self._less_than(type_, node, head) + if isinstance(node, ast.filter.GreaterThan): + return self._greater_than(type_, node, head) + # invalid node + raise errors.BackendError(f'expected filter expression, found {node}') + + def _parse_predicate_expression( + self, + type_: bsc.Vertex, + node: ast.filter.PredicateExpression + ) -> typing.Tuple[str, bsc.Vertex]: + """Route *node* to the handler of the respective PredicateExpression subclass.""" + if isinstance(node, ast.filter.Predicate): + return self._predicate(type_, node) + if isinstance(node, ast.filter.OneOf): + return self._one_of(type_, node) + # invalid node + raise errors.BackendError(f'expected predicate expression, found {node}') + + def _one_of(self, node_type: bsc.Vertex, node: ast.filter.OneOf) -> typing.Tuple[str, bsc.Vertex]: + """ + """ + if not isinstance(node_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {node_type}') + # walk through predicates + suburi, rng = set(), None + for pred in node: # OneOf guarantees at least one expression + puri, subrng = self._parse_predicate_expression(node_type, pred) + # track predicate uris + suburi.add(puri) + # check for more generic range + if rng is None or subrng > rng: + rng = subrng + # check range consistency + if not subrng <= rng and not subrng >= rng: + raise errors.ConsistencyError(f'ranges {subrng} and {rng} are not related') + # return joint predicate expression and next range + # OneOf guarantees at least one expression, rng is always a bsc.Vertex. + # mypy does not realize this, hence we ignore the warning. + return '|'.join(suburi), rng # type: ignore [return-value] + + def _predicate(self, node_type: bsc.Vertex, node: ast.filter.Predicate) -> typing.Tuple[str, bsc.Vertex]: + """ + """ + # check node_type + if not isinstance(node_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {node_type}') + # fetch predicate and its uri + puri = node.predicate + # get and check predicate, domain, and range + if not self.schema.has_predicate(puri): + raise errors.ConsistencyError(f'predicate {puri} is not in the schema') + pred = self.schema.predicate(puri) + if not isinstance(pred.range, (bsc.Node, bsc.Literal)): + raise errors.BackendError(f'the range of predicate {pred} is undefined') + dom, rng = pred.domain, pred.range + # encapsulate predicate uri + uri_str = f'<{puri}>' + # apply reverse flag + if node.reverse: + uri_str = '^' + uri_str + dom, rng = rng, dom # type: ignore [assignment] # variable re-use confuses mypy + # check path consistency + if not node_type <= dom: + raise errors.ConsistencyError(f'expected type {dom} or subtype thereof, found {node_type}') + # return predicate URI and next node type + return uri_str, rng + + def _any(self, node_type: bsc.Vertex, node: ast.filter.Any, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {node_type}') + # parse predicate + pred, next_type = self._parse_predicate_expression(node_type, node.predicate) + # parse expression + nexthead = next(self.ngen) + expr = self._parse_filter_expression(next_type, node.expr, nexthead) + # combine results + return f'{head} {pred} {nexthead} . {expr}' + + def _all(self, node_type: bsc.Vertex, node: ast.filter.All, head: str) -> str: + """ + """ + # NOTE: All(P, E) := Not(Any(P, Not(E))) and EXISTS(P, ?) + if not isinstance(node_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {node_type}') + # parse rewritten ast + expr = self._parse_filter_expression(node_type, + ast.filter.Not( + ast.filter.Any(node.predicate, + ast.filter.Not(node.expr))), head) + # parse predicate for existence constraint + pred, _ = self._parse_predicate_expression(node_type, node.predicate) + temphead = next(self.ngen) + # return existence and rewritten expression + return f'FILTER EXISTS {{ {head} {pred} {temphead} }} . ' + expr + + def _and(self, node_type: bsc.Vertex, node: ast.filter.And, head: str) -> str: + """ + """ + sub = [self._parse_filter_expression(node_type, expr, head) for expr in node] + return ' . '.join(sub) + + def _or(self, node_type: bsc.Vertex, node: ast.filter.Or, head: str) -> str: + """ + """ + # potential special case optimization: + # * ast: Or(Equals('foo'), Equals('bar'), ...) + # * query: VALUES ?head { "value1"^^<...> "value2"^^<...> "value3"^<...> ... } + sub = [self._parse_filter_expression(node_type, expr, head) for expr in node] + sub = ['{' + expr + '}' for expr in sub] + return ' UNION '.join(sub) + + def _not(self, node_type: bsc.Vertex, node: ast.filter.Not, head: str) -> str: + """ + """ + expr = self._parse_filter_expression(node_type, node.expr, head) + if isinstance(node_type, bsc.Literal): + return f'MINUS {{ {expr} }}' + # NOTE: for bsc.Node types, we must include at least one expression in the body of MINUS, + # otherwise the connection between the context and body of MINUS is lost. + # The simplest (and non-interfering) choice is a type statement. + return f'MINUS {{ {head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{node_type.uri}> . {expr} }}' + + def _has(self, node_type: bsc.Vertex, node: ast.filter.Has, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {node_type}') + # parse predicate + pred, _ = self._parse_predicate_expression(node_type, node.predicate) + # get new heads + inner = next(self.ngen) + outer = next(self.ngen) + # predicate count expression (fetch number of predicates at *head*) + num_preds = f'{{ SELECT (COUNT(distinct {inner}) as {outer}) WHERE {{ {head} {pred} {inner} }} }}' + # count expression + count_bounds = self._parse_filter_expression(self.schema.literal(ns.xsd.integer), node.count, outer) + # combine + return num_preds + ' . ' + count_bounds + + def _distance(self, node_type: bsc.Vertex, node: ast.filter.Distance, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Feature): + raise errors.BackendError(f'expected Feature, found {node_type}') + if len(node.reference) != node_type.dimension: + raise errors.ConsistencyError( + f'reference has dimension {len(node.reference)}, expected {node_type.dimension}') + # get distance metric + dist = DISTANCE_FU[node_type.distance] + # get operator + cmp = operator.lt if node.strict else operator.le + # get candidate values + candidates = { + f'"{cand}"^^<{node_type.uri}>' + for cand + in self.graph.objects() + if isinstance(cand, rdflib.Literal) + and cand.datatype == rdflib.URIRef(node_type.uri) + and cmp(dist(cand.value, node.reference), node.threshold) + } + # combine candidate values + values = ' '.join(candidates) if len(candidates) else f'"impossible value"^^<{ns.xsd.string}>' + # return sparql fragment + return f'VALUES {head} {{ {values} }}' + + def _is(self, node_type: bsc.Vertex, node: ast.filter.Is, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {node_type}') + return f'VALUES {head} {{ <{URI(node.value)}> }}' + + def _equals(self, node_type: bsc.Vertex, node: ast.filter.Equals, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Literal): + raise errors.BackendError(f'expected Literal, found {node}') + return f'VALUES {head} {{ "{node.value}"^^<{node_type.uri}> }}' + + def _substring(self, node_type: bsc.Vertex, node: ast.filter.Substring, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Literal): + raise errors.BackendError(f'expected Literal, found {node_type}') + return f'FILTER contains(str({head}), "{node.value}")' + + def _starts_with(self, node_type: bsc.Vertex, node: ast.filter.StartsWith, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Literal): + raise errors.BackendError(f'expected Literal, found {node_type}') + return f'FILTER strstarts(str({head}), "{node.value}")' + + def _ends_with(self, node_type: bsc.Vertex, node: ast.filter.EndsWith, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Literal): + raise errors.BackendError(f'expected Literal, found {node_type}') + return f'FILTER strends(str({head}), "{node.value}")' + + def _less_than(self, node_type: bsc.Vertex, node: ast.filter.LessThan, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Literal): + raise errors.BackendError(f'expected Literal, found {node_type}') + equality = '=' if not node.strict else '' + return f'FILTER ({head} <{equality} {float(node.threshold)})' + + def _greater_than(self, node_type: bsc.Vertex, node: ast.filter.GreaterThan, head: str) -> str: + """ + """ + if not isinstance(node_type, bsc.Literal): + raise errors.BackendError(f'expected Literal, found {node_type}') + equality = '=' if not node.strict else '' + return f'FILTER ({head} >{equality} {float(node.threshold)})' + +## EOF ## diff --git a/bsfs/triple_store/sparql.py b/bsfs/triple_store/sparql/sparql.py index 7516dff..99e67d6 100644 --- a/bsfs/triple_store/sparql.py +++ b/bsfs/triple_store/sparql/sparql.py @@ -1,20 +1,23 @@ -""" -Part of the BlackStar filesystem (bsfs) module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" # imports +import base64 import itertools import typing + +# external imports import rdflib # bsfs imports from bsfs import schema as bsc +from bsfs.namespace import ns +from bsfs.query import ast from bsfs.utils import errors, URI # inner-module imports -from . import base +from . import parse_fetch +from . import parse_filter +from .. import base +from .distance import DISTANCE_FU # exports @@ -25,6 +28,8 @@ __all__: typing.Sequence[str] = ( ## code ## +rdflib.term.bind(ns.bsl.BinaryBlob, bytes, constructor=base64.b64decode) + class _Transaction(): """Lightweight rdflib transactions for in-memory databases.""" @@ -85,11 +90,19 @@ class SparqlStore(base.TripleStoreBase): # The local schema. _schema: bsc.Schema + # Filter parser + _filter_parser: parse_filter.Filter + + # Fetch parser + _fetch_parser: parse_fetch.Fetch + def __init__(self): super().__init__(None) self._graph = rdflib.Graph() self._transaction = _Transaction(self._graph) - self._schema = bsc.Schema.Empty() + self._schema = bsc.Schema(literals={bsc.ROOT_NUMBER.child(ns.xsd.integer)}) + self._filter_parser = parse_filter.Filter(self._graph, self._schema) + self._fetch_parser = parse_fetch.Fetch(self._schema) # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) # However, not having it here is clearer since it's explicit that there are no arguments. @@ -115,6 +128,16 @@ class SparqlStore(base.TripleStoreBase): # check compatibility: No contradicting definitions if not self.schema.consistent_with(schema): raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') + # check distance functions of features + invalid = { + (cand.uri, cand.distance) + for cand + in schema.literals() + if isinstance(cand, bsc.Feature) and cand.distance not in DISTANCE_FU} + if len(invalid) > 0: + cand, dist = zip(*invalid) + raise errors.UnsupportedError( + f'unknown distance function {",".join(dist)} in feature {", ".join(cand)}') # commit the current transaction self.commit() @@ -126,10 +149,17 @@ class SparqlStore(base.TripleStoreBase): # get deleted classes sub = self.schema - schema - # remove predicate instances for pred in sub.predicates: + # remove predicate instances for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) + # remove predicate definition + if pred.parent is not None: # NOTE: there shouldn't be any predicate w/o parent + self._transaction.remove(( + rdflib.URIRef(pred.uri), + rdflib.RDFS.subClassOf, + rdflib.URIRef(pred.parent.uri), + )) # remove node instances for node in sub.nodes: @@ -143,15 +173,82 @@ class SparqlStore(base.TripleStoreBase): self._transaction.remove((inst, pred, trg)) # remove instance self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) - - # NOTE: Nothing to do for literals + # remove node definition + if node.parent is not None: # NOTE: there shouldn't be any node w/o parent + self._transaction.remove(( + rdflib.URIRef(node.uri), + rdflib.RDFS.subClassOf, + rdflib.URIRef(node.parent.uri), + )) + + for lit in sub.literals: + # remove literal definition + if lit.parent is not None: # NOTE: there shouldn't be any literal w/o parent + self._transaction.remove(( + rdflib.URIRef(lit.uri), + rdflib.RDFS.subClassOf, + rdflib.URIRef(lit.parent.uri), + )) + + # add predicate, node, and literal hierarchies to the graph + for itm in itertools.chain(schema.predicates(), schema.nodes(), schema.literals()): + if itm.parent is not None: + self._transaction.add((rdflib.URIRef(itm.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(itm.parent.uri))) # commit instance changes self.commit() # migrate schema self._schema = schema + self._filter_parser.schema = schema + self._fetch_parser.schema = schema + def fetch( + self, + node_type: bsc.Node, + filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin + fetch: ast.fetch.FetchExpression, + ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]: + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + if not isinstance(filter, ast.filter.FilterExpression): + raise TypeError(filter) + if not isinstance(fetch, ast.fetch.FetchExpression): + raise TypeError(fetch) + # compose a query from fetch and filter ast + query = self._filter_parser(node_type, filter) + query += self._fetch_parser(node_type, fetch) + # run query + emitted = set() + for result in query(self._graph): + guid = URI(result[0]) + for name, raw in zip(query.names, result[1:]): + if raw is None: # undefined value + continue + if isinstance(raw, rdflib.Literal): + value = raw.value + else: + value = URI(raw) + # emit triple + triple = (guid, name, value) + if triple not in emitted: # FIXME: needs a better solution! + emitted.add(triple) + yield guid, name, value + + def get( + self, + node_type: bsc.Node, + filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin + ) -> typing.Iterator[URI]: + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + if filter is not None and not isinstance(filter, ast.filter.FilterExpression): + raise TypeError(filter) + # compose query + query = self._filter_parser(node_type, filter) + # run query + for guid, in query(self._graph): + yield URI(guid) def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" @@ -187,7 +284,7 @@ class SparqlStore(base.TripleStoreBase): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check and create guids for guid in guids: - subject = rdflib.URIRef(guid) + subject = rdflib.URIRef(URI(guid)) # check node existence if (subject, rdflib.RDF.type, None) in self._graph: # FIXME: node exists and may have a different type! ignore? raise? report? @@ -226,7 +323,7 @@ class SparqlStore(base.TripleStoreBase): raise errors.InstanceError(inconsistent) # check guids # FIXME: Fail or skip inexistent nodes? - guids = set(guids) + guids = {URI(guid) for guid in guids} inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) @@ -237,7 +334,11 @@ class SparqlStore(base.TripleStoreBase): guid = rdflib.URIRef(guid) # convert value if isinstance(predicate.range, bsc.Literal): - value = rdflib.Literal(value, datatype=rdflib.URIRef(predicate.range.uri)) + dtype = rdflib.URIRef(predicate.range.uri) + if predicate.range <= self.schema.literal(ns.bsl.BinaryBlob): + dtype = rdflib.URIRef(ns.bsl.BinaryBlob) + value = base64.b64encode(value) + value = rdflib.Literal(value, datatype=dtype) elif isinstance(predicate.range, bsc.Node): value = rdflib.URIRef(value) else: diff --git a/bsfs/triple_store/sparql/utils.py b/bsfs/triple_store/sparql/utils.py new file mode 100644 index 0000000..38062c2 --- /dev/null +++ b/bsfs/triple_store/sparql/utils.py @@ -0,0 +1,137 @@ + +# standard imports +import typing + +# external imports +import rdflib + +# bsfs imports +from bsfs.namespace import ns +from bsfs.utils import typename + +# exports +__all__: typing.Sequence[str] = ( + 'GenHopName', + 'Query', + ) + + +## code ## + +class GenHopName(): + """Generator that produces a new unique symbol name with each iteration.""" + + # Symbol name prefix. + prefix: str + + # Current counter. + curr: int + + def __init__(self, prefix: str = '?hop', start: int = 0): + self.prefix = prefix + self.curr = start - 1 + + def __next__(self): + """Generate and return the next unique name.""" + self.curr += 1 + return self.prefix + str(self.curr) + + +class Query(): + """Hold, manage, and complete partial Sparql queries.""" + + # root node type URI. + root_type: str + + # root node variable name. + root_head: str + + # (head, name) tuples (w/o root) + select: typing.Tuple[typing.Tuple[str, str], ...] + + # where statements. + where: str + + def __init__( + self, + root_type: str, + root_head: str = '?ent', + select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None, + where: typing.Optional[str] = None, + ): + # check arguments + if select is None: + select = [] + if where is None: + where = '' + # set members + self.root_type = root_type + self.root_head = root_head + self.select = tuple(select) # tuple ensures presistent order + self.where = where.strip() + + def __str__(self) -> str: + return self.query + + def __repr__(self) -> str: + return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})' + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self.root_type == other.root_type \ + and self.root_head == other.root_head \ + and self.select == other.select \ + and self.where == other.where + + def __hash__(self) -> int: + return hash((type(self), self.root_type, self.root_head, self.select, self.where)) + + def __add__(self, other: typing.Any) -> 'Query': + # check other's type + if not isinstance(other, type(self)): + return NotImplemented + # check query compatibility + if not self.root_type == other.root_type: + raise ValueError(other) + if not self.root_head == other.root_head: + raise ValueError(other) + # combine selections + select = self.select + other.select + # combine conditions + conds = [] + if self.where != '': + conds.append(self.where) + if other.where != '': + conds.append(other.where) + where = ' . '.join(conds) + # return new query + return Query( + root_type=self.root_type, + root_head=self.root_head, + select=select, + where=where, + ) + + @property + def names(self) -> typing.Tuple[str, ...]: + """Return a tuple of selected variable names, excluding the root.""" + return tuple(name for _, name in self.select) + + @property + def query(self) -> str: + """Return an executable sparql query.""" + select = ' '.join(f'({head} as ?{name})' for head, name in self.select) + return f''' + SELECT DISTINCT {self.root_head} {select} + WHERE {{ + {self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> . + {self.where} + }} + ORDER BY str({self.root_head}) + ''' + + def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result: + """Execute the query on a *graph* and return the query result.""" + return graph.query(self.query) + +## EOF ## |