From 965f4dfe41afd552ed6477c75e1286c14e3580f6 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sat, 21 Jan 2023 16:31:08 +0100 Subject: Fetch in triple stores: * fetch interface * sparql fetch ast parser * sparql fetch implementation --- bsfs/triple_store/base.py | 33 +++++--- bsfs/triple_store/sparql/parse_fetch.py | 109 ++++++++++++++++++++++++ bsfs/triple_store/sparql/parse_filter.py | 45 ++++------ bsfs/triple_store/sparql/sparql.py | 50 +++++++++-- bsfs/triple_store/sparql/utils.py | 141 +++++++++++++++++++++++++++++++ 5 files changed, 332 insertions(+), 46 deletions(-) create mode 100644 bsfs/triple_store/sparql/parse_fetch.py create mode 100644 bsfs/triple_store/sparql/utils.py (limited to 'bsfs') diff --git a/bsfs/triple_store/base.py b/bsfs/triple_store/base.py index 7e03714..1baa63b 100644 --- a/bsfs/triple_store/base.py +++ b/bsfs/triple_store/base.py @@ -11,7 +11,7 @@ import typing # inner-module imports from bsfs.query import ast from bsfs.utils import URI, typename -import bsfs.schema as _schema +import bsfs.schema as bsc # exports __all__: typing.Sequence[str] = ( @@ -82,12 +82,12 @@ class TripleStoreBase(abc.ABC): @property @abc.abstractmethod - def schema(self) -> _schema.Schema: + def schema(self) -> bsc.Schema: """Return the store's local schema.""" @schema.setter @abc.abstractmethod - def schema(self, schema: _schema.Schema): + def schema(self, schema: bsc.Schema): """Migrate to new schema by adding or removing class definitions. Commits before and after the migration. @@ -112,17 +112,28 @@ class TripleStoreBase(abc.ABC): @abc.abstractmethod def get( self, - node_type: _schema.Node, - query: typing.Optional[ast.filter.FilterExpression] = None, + node_type: bsc.Node, + filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin ) -> typing.Iterator[URI]: - """Return guids of nodes of type *node_type* that match the *query*. - Return all guids of the respective type if *query* is None. + """Return guids of nodes of type *node_type* that match the *filter*. + Return all guids of the respective type if *filter* is None. + """ + + @abc.abstractmethod + def fetch( + self, + node_type: bsc.Node, + filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin + fetch: ast.fetch.FetchExpression, + ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]: + """Return (guid, name, value) triples where the guid is determined by the *filter* + query and the name matches the *fetch* query. """ @abc.abstractmethod def exists( self, - node_type: _schema.Node, + node_type: bsc.Node, guids: typing.Iterable[URI], ) -> typing.Iterable[URI]: """Return those *guids* that exist and have type *node_type* or a subclass thereof.""" @@ -130,7 +141,7 @@ class TripleStoreBase(abc.ABC): @abc.abstractmethod def create( self, - node_type: _schema.Node, + node_type: bsc.Node, guids: typing.Iterable[URI], ): """Create *guid* nodes with type *subject*.""" @@ -138,9 +149,9 @@ class TripleStoreBase(abc.ABC): @abc.abstractmethod def set( self, - node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? + node_type: bsc.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate? guids: typing.Iterable[URI], - predicate: _schema.Predicate, + predicate: bsc.Predicate, values: typing.Iterable[typing.Any], ): """Add triples to the graph. diff --git a/bsfs/triple_store/sparql/parse_fetch.py b/bsfs/triple_store/sparql/parse_fetch.py new file mode 100644 index 0000000..20d4e74 --- /dev/null +++ b/bsfs/triple_store/sparql/parse_fetch.py @@ -0,0 +1,109 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +import typing + +# bsfs imports +from bsfs import schema as bsc +from bsfs.query import ast +from bsfs.utils import errors + +# inner-module imports +from .utils import GenHopName, Query + +# exports +__all__: typing.Sequence[str] = ( + 'Fetch', + ) + + +## code ## + +class Fetch(): + """Translate `bsfs.query.ast.fetch` structures into Sparql queries.""" + + def __init__(self, schema): + self.schema = schema + self.ngen = GenHopName(prefix='?fch') + + def __call__( + self, + root_type: bsc.Node, + root: ast.fetch.FetchExpression, + ) -> Query: + """ + """ + # check root_type + if not isinstance(root_type, bsc.Node): + raise errors.BackendError(f'expected Node, found {root_type}') + if root_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'node {root_type} is not in the schema') + # parse root + terms, expr = self._parse_fetch_expression(root_type, root, '?ent') + # assemble query + return Query( + root_type=root_type.uri, + root_head='?ent', + select=terms, + where=expr, + ) + + def _parse_fetch_expression( + self, + node_type: bsc.Vertex, + node: ast.fetch.FetchExpression, + head: str, + ): + """Route *node* to the handler of the respective FetchExpression subclass.""" + if isinstance(node, ast.fetch.All): + return self._all(node_type, node, head) + if isinstance(node, ast.fetch.Fetch): + return self._fetch(node_type, node, head) + if isinstance(node, ast.fetch.Node): + return self._node(node_type, node, head) + if isinstance(node, ast.fetch.Value): + return self._value(node_type, node, head) + if isinstance(node, ast.fetch.This): + return self._this(node_type, node, head) + # invalid node + raise errors.BackendError(f'expected fetch expression, found {node}') + + def _all(self, node_type: bsc.Vertex, node: ast.fetch.All, head: str): + # child expressions + terms, exprs = zip(*[self._parse_fetch_expression(node_type, expr, head) for expr in node]) + terms = {term for sub in terms for term in sub} + exprs = ' .\n'.join({expr for expr in exprs if len(expr.strip()) > 0}) + return terms, exprs + + def _fetch(self, node_type: bsc.Vertex, node: ast.fetch.Fetch, head: str): # pylint: disable=unused-argument # (node_type) + # child expressions + rng = self.schema.predicate(node.predicate).range + nexthead = next(self.ngen) + terms, expr = self._parse_fetch_expression(rng, node.expr, nexthead) + return terms, f'OPTIONAL{{ {head} <{node.predicate}> {nexthead} .\n {expr} }}' + + def _node(self, node_type: bsc.Vertex, node: ast.fetch.Node, head: str): # pylint: disable=unused-argument # (node_type) + if f'?{node.name}'.startswith(self.ngen.prefix): + raise errors.BackendError(f'Node name must start with {self.ngen.prefix}') + # compose and return statement + term = next(self.ngen) + return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}' + + def _value(self, node_type: bsc.Vertex, node: ast.fetch.Value, head: str): # pylint: disable=unused-argument # (node_type) + if f'?{node.name}'.startswith(self.ngen.prefix): + raise errors.BackendError(f'Value name must start with {self.ngen.prefix}') + # compose and return statement + term = next(self.ngen) + return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}' + + def _this(self, node_type: bsc.Vertex, node: ast.fetch.This, head: str): # pylint: disable=unused-argument # (node_type) + if f'?{node.name}'.startswith(self.ngen.prefix): + raise errors.BackendError(f'This name must start with {self.ngen.prefix}') + # compose and return statement + return {(head, node.name)}, '' + +## EOF ## diff --git a/bsfs/triple_store/sparql/parse_filter.py b/bsfs/triple_store/sparql/parse_filter.py index 8b6b976..dca0aea 100644 --- a/bsfs/triple_store/sparql/parse_filter.py +++ b/bsfs/triple_store/sparql/parse_filter.py @@ -19,6 +19,7 @@ from bsfs.utils import URI, errors # inner-module imports from .distance import DISTANCE_FU +from .utils import GenHopName, Query # exports __all__: typing.Sequence[str] = ( @@ -28,25 +29,6 @@ __all__: typing.Sequence[str] = ( ## code ## -class _GenHopName(): - """Generator that produces a new unique symbol name with each iteration.""" - - # Symbol name prefix. - prefix: str - - # Current counter. - curr: int - - def __init__(self, prefix: str = '?hop', start: int = 0): - self.prefix = prefix - self.curr = start - 1 - - def __next__(self): - """Generate and return the next unique name.""" - self.curr += 1 - return self.prefix + str(self.curr) - - class Filter(): """Translate `bsfs.query.ast.filter` structures into Sparql queries.""" @@ -54,18 +36,18 @@ class Filter(): schema: bsc.Schema # Generator that produces unique symbol names. - ngen: _GenHopName + ngen: GenHopName def __init__(self, graph, schema): self.graph = graph self.schema = schema - self.ngen = _GenHopName() + self.ngen = GenHopName(prefix='?flt') def __call__( self, root_type: bsc.Node, root: typing.Optional[ast.filter.FilterExpression] = None, - ) -> str: + ) -> Query: """ """ # check root_type @@ -79,15 +61,18 @@ class Filter(): else: cond = self._parse_filter_expression(root_type, root, '?ent') # assemble query - return f''' - SELECT ?ent - WHERE {{ - ?ent <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{root_type.uri}> . - {cond} - }} - ''' + return Query( + root_type=root_type.uri, + root_head='?ent', + where=cond, + ) - def _parse_filter_expression(self, type_: bsc.Vertex, node: ast.filter.FilterExpression, head: str) -> str: + def _parse_filter_expression( + self, + type_: bsc.Vertex, + node: ast.filter.FilterExpression, + head: str, + ) -> str: """Route *node* to the handler of the respective FilterExpression subclass.""" if isinstance(node, ast.filter.Is): return self._is(type_, node, head) diff --git a/bsfs/triple_store/sparql/sparql.py b/bsfs/triple_store/sparql/sparql.py index fedd227..a0dd12e 100644 --- a/bsfs/triple_store/sparql/sparql.py +++ b/bsfs/triple_store/sparql/sparql.py @@ -16,6 +16,7 @@ from bsfs.query import ast from bsfs.utils import errors, URI # inner-module imports +from . import parse_fetch from . import parse_filter from .. import base from .distance import DISTANCE_FU @@ -92,13 +93,16 @@ class SparqlStore(base.TripleStoreBase): # Filter parser _filter_parser: parse_filter.Filter + # Fetch parser + _fetch_parser: parse_fetch.Fetch + def __init__(self): super().__init__(None) self._graph = rdflib.Graph() self._transaction = _Transaction(self._graph) - # NOTE: parsing bsfs.query.ast.filter.Has requires xsd:integer. self._schema = bsc.Schema(literals={bsc.ROOT_NUMBER.child(ns.xsd.integer)}) self._filter_parser = parse_filter.Filter(self._graph, self._schema) + self._fetch_parser = parse_fetch.Fetch(self._schema) # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) # However, not having it here is clearer since it's explicit that there are no arguments. @@ -197,17 +201,53 @@ class SparqlStore(base.TripleStoreBase): # migrate schema self._schema = schema self._filter_parser.schema = schema + self._fetch_parser.schema = schema + + def fetch( + self, + node_type: bsc.Node, + filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin + fetch: ast.fetch.FetchExpression, + ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]: + if node_type not in self.schema.nodes(): + raise errors.ConsistencyError(f'{node_type} is not defined in the schema') + if not isinstance(filter, ast.filter.FilterExpression): + raise TypeError(filter) + if not isinstance(fetch, ast.fetch.FetchExpression): + raise TypeError(fetch) + # compose a query from fetch and filter ast + query = self._filter_parser(node_type, filter) + query += self._fetch_parser(node_type, fetch) + # run query + emitted = set() + for result in query(self._graph): + guid = URI(result[0]) + for name, raw in zip(query.names, result[1:]): + if raw is None: # undefined value + continue + if isinstance(raw, rdflib.Literal): + value = raw.value + else: + value = URI(raw) + # emit triple + triple = (guid, name, value) + if triple not in emitted: # FIXME: needs a better solution! + emitted.add(triple) + yield guid, name, value def get( self, node_type: bsc.Node, - query: typing.Optional[ast.filter.FilterExpression] = None, + filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin ) -> typing.Iterator[URI]: if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') - if not isinstance(query, ast.filter.FilterExpression): - raise TypeError(query) - for guid, in self._graph.query(self._filter_parser(node_type, query)): + if not isinstance(filter, ast.filter.FilterExpression): + raise TypeError(filter) + # compose query + query = self._filter_parser(node_type, filter) + # run query + for guid, in query(self._graph): yield URI(guid) def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: diff --git a/bsfs/triple_store/sparql/utils.py b/bsfs/triple_store/sparql/utils.py new file mode 100644 index 0000000..deca4d8 --- /dev/null +++ b/bsfs/triple_store/sparql/utils.py @@ -0,0 +1,141 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +import typing + +# external imports +import rdflib + +# bsfs imports +from bsfs.namespace import ns +from bsfs.utils import typename + +# exports +__all__: typing.Sequence[str] = ( + 'GenHopName', + 'Query', + ) + + +## code ## + +class GenHopName(): + """Generator that produces a new unique symbol name with each iteration.""" + + # Symbol name prefix. + prefix: str + + # Current counter. + curr: int + + def __init__(self, prefix: str = '?hop', start: int = 0): + self.prefix = prefix + self.curr = start - 1 + + def __next__(self): + """Generate and return the next unique name.""" + self.curr += 1 + return self.prefix + str(self.curr) + + +class Query(): + """Hold, manage, and complete partial Sparql queries.""" + + # root node type URI. + root_type: str + + # root node variable name. + root_head: str + + # (head, name) tuples (w/o root) + select: typing.Tuple[typing.Tuple[str, str], ...] + + # where statements. + where: str + + def __init__( + self, + root_type: str, + root_head: str = '?ent', + select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None, + where: typing.Optional[str] = None, + ): + # check arguments + if select is None: + select = [] + if where is None: + where = '' + # set members + self.root_type = root_type + self.root_head = root_head + self.select = tuple(select) # tuple ensures presistent order + self.where = where.strip() + + def __str__(self) -> str: + return self.query + + def __repr__(self) -> str: + return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})' + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self.root_type == other.root_type \ + and self.root_head == other.root_head \ + and self.select == other.select \ + and self.where == other.where + + def __hash__(self) -> int: + return hash((type(self), self.root_type, self.root_head, self.select, self.where)) + + def __add__(self, other: typing.Any) -> 'Query': + # check other's type + if not isinstance(other, type(self)): + return NotImplemented + # check query compatibility + if not self.root_type == other.root_type: + raise ValueError(other) + if not self.root_head == other.root_head: + raise ValueError(other) + # combine selections + select = self.select + other.select + # combine conditions + conds = [] + if self.where != '': + conds.append(self.where) + if other.where != '': + conds.append(other.where) + where = ' . '.join(conds) + # return new query + return Query( + root_type=self.root_type, + root_head=self.root_head, + select=select, + where=where, + ) + + @property + def names(self) -> typing.Tuple[str, ...]: + """Return a tuple of selected variable names, excluding the root.""" + return tuple(name for _, name in self.select) + + @property + def query(self) -> str: + """Return an executable sparql query.""" + select = ' '.join(f'({head} as ?{name})' for head, name in self.select) + return f''' + SELECT {self.root_head} {select} + WHERE {{ + {self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> . + {self.where} + }} + ''' + + def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result: + """Execute the query on a *graph* and return the query result.""" + return graph.query(self.query) + +## EOF ## -- cgit v1.2.3