aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-01-21 16:31:08 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-01-21 16:31:08 +0100
commit965f4dfe41afd552ed6477c75e1286c14e3580f6 (patch)
tree36f403910222c4536f1fe6ed3330228cee8c178e /bsfs
parente2f08efc0d8a3c875994bdb69623c30cce5079d9 (diff)
downloadbsfs-965f4dfe41afd552ed6477c75e1286c14e3580f6.tar.gz
bsfs-965f4dfe41afd552ed6477c75e1286c14e3580f6.tar.bz2
bsfs-965f4dfe41afd552ed6477c75e1286c14e3580f6.zip
Fetch in triple stores:
* fetch interface * sparql fetch ast parser * sparql fetch implementation
Diffstat (limited to 'bsfs')
-rw-r--r--bsfs/triple_store/base.py33
-rw-r--r--bsfs/triple_store/sparql/parse_fetch.py109
-rw-r--r--bsfs/triple_store/sparql/parse_filter.py45
-rw-r--r--bsfs/triple_store/sparql/sparql.py50
-rw-r--r--bsfs/triple_store/sparql/utils.py141
5 files changed, 332 insertions, 46 deletions
diff --git a/bsfs/triple_store/base.py b/bsfs/triple_store/base.py
index 7e03714..1baa63b 100644
--- a/bsfs/triple_store/base.py
+++ b/bsfs/triple_store/base.py
@@ -11,7 +11,7 @@ import typing
# inner-module imports
from bsfs.query import ast
from bsfs.utils import URI, typename
-import bsfs.schema as _schema
+import bsfs.schema as bsc
# exports
__all__: typing.Sequence[str] = (
@@ -82,12 +82,12 @@ class TripleStoreBase(abc.ABC):
@property
@abc.abstractmethod
- def schema(self) -> _schema.Schema:
+ def schema(self) -> bsc.Schema:
"""Return the store's local schema."""
@schema.setter
@abc.abstractmethod
- def schema(self, schema: _schema.Schema):
+ def schema(self, schema: bsc.Schema):
"""Migrate to new schema by adding or removing class definitions.
Commits before and after the migration.
@@ -112,17 +112,28 @@ class TripleStoreBase(abc.ABC):
@abc.abstractmethod
def get(
self,
- node_type: _schema.Node,
- query: typing.Optional[ast.filter.FilterExpression] = None,
+ node_type: bsc.Node,
+ filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin
) -> typing.Iterator[URI]:
- """Return guids of nodes of type *node_type* that match the *query*.
- Return all guids of the respective type if *query* is None.
+ """Return guids of nodes of type *node_type* that match the *filter*.
+ Return all guids of the respective type if *filter* is None.
+ """
+
+ @abc.abstractmethod
+ def fetch(
+ self,
+ node_type: bsc.Node,
+ filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin
+ fetch: ast.fetch.FetchExpression,
+ ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]:
+ """Return (guid, name, value) triples where the guid is determined by the *filter*
+ query and the name matches the *fetch* query.
"""
@abc.abstractmethod
def exists(
self,
- node_type: _schema.Node,
+ node_type: bsc.Node,
guids: typing.Iterable[URI],
) -> typing.Iterable[URI]:
"""Return those *guids* that exist and have type *node_type* or a subclass thereof."""
@@ -130,7 +141,7 @@ class TripleStoreBase(abc.ABC):
@abc.abstractmethod
def create(
self,
- node_type: _schema.Node,
+ node_type: bsc.Node,
guids: typing.Iterable[URI],
):
"""Create *guid* nodes with type *subject*."""
@@ -138,9 +149,9 @@ class TripleStoreBase(abc.ABC):
@abc.abstractmethod
def set(
self,
- node_type: _schema.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate?
+ node_type: bsc.Node, # FIXME: is the node_type even needed? Couldn't I infer from the predicate?
guids: typing.Iterable[URI],
- predicate: _schema.Predicate,
+ predicate: bsc.Predicate,
values: typing.Iterable[typing.Any],
):
"""Add triples to the graph.
diff --git a/bsfs/triple_store/sparql/parse_fetch.py b/bsfs/triple_store/sparql/parse_fetch.py
new file mode 100644
index 0000000..20d4e74
--- /dev/null
+++ b/bsfs/triple_store/sparql/parse_fetch.py
@@ -0,0 +1,109 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+import typing
+
+# bsfs imports
+from bsfs import schema as bsc
+from bsfs.query import ast
+from bsfs.utils import errors
+
+# inner-module imports
+from .utils import GenHopName, Query
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Fetch',
+ )
+
+
+## code ##
+
+class Fetch():
+ """Translate `bsfs.query.ast.fetch` structures into Sparql queries."""
+
+ def __init__(self, schema):
+ self.schema = schema
+ self.ngen = GenHopName(prefix='?fch')
+
+ def __call__(
+ self,
+ root_type: bsc.Node,
+ root: ast.fetch.FetchExpression,
+ ) -> Query:
+ """
+ """
+ # check root_type
+ if not isinstance(root_type, bsc.Node):
+ raise errors.BackendError(f'expected Node, found {root_type}')
+ if root_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'node {root_type} is not in the schema')
+ # parse root
+ terms, expr = self._parse_fetch_expression(root_type, root, '?ent')
+ # assemble query
+ return Query(
+ root_type=root_type.uri,
+ root_head='?ent',
+ select=terms,
+ where=expr,
+ )
+
+ def _parse_fetch_expression(
+ self,
+ node_type: bsc.Vertex,
+ node: ast.fetch.FetchExpression,
+ head: str,
+ ):
+ """Route *node* to the handler of the respective FetchExpression subclass."""
+ if isinstance(node, ast.fetch.All):
+ return self._all(node_type, node, head)
+ if isinstance(node, ast.fetch.Fetch):
+ return self._fetch(node_type, node, head)
+ if isinstance(node, ast.fetch.Node):
+ return self._node(node_type, node, head)
+ if isinstance(node, ast.fetch.Value):
+ return self._value(node_type, node, head)
+ if isinstance(node, ast.fetch.This):
+ return self._this(node_type, node, head)
+ # invalid node
+ raise errors.BackendError(f'expected fetch expression, found {node}')
+
+ def _all(self, node_type: bsc.Vertex, node: ast.fetch.All, head: str):
+ # child expressions
+ terms, exprs = zip(*[self._parse_fetch_expression(node_type, expr, head) for expr in node])
+ terms = {term for sub in terms for term in sub}
+ exprs = ' .\n'.join({expr for expr in exprs if len(expr.strip()) > 0})
+ return terms, exprs
+
+ def _fetch(self, node_type: bsc.Vertex, node: ast.fetch.Fetch, head: str): # pylint: disable=unused-argument # (node_type)
+ # child expressions
+ rng = self.schema.predicate(node.predicate).range
+ nexthead = next(self.ngen)
+ terms, expr = self._parse_fetch_expression(rng, node.expr, nexthead)
+ return terms, f'OPTIONAL{{ {head} <{node.predicate}> {nexthead} .\n {expr} }}'
+
+ def _node(self, node_type: bsc.Vertex, node: ast.fetch.Node, head: str): # pylint: disable=unused-argument # (node_type)
+ if f'?{node.name}'.startswith(self.ngen.prefix):
+ raise errors.BackendError(f'Node name must start with {self.ngen.prefix}')
+ # compose and return statement
+ term = next(self.ngen)
+ return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}'
+
+ def _value(self, node_type: bsc.Vertex, node: ast.fetch.Value, head: str): # pylint: disable=unused-argument # (node_type)
+ if f'?{node.name}'.startswith(self.ngen.prefix):
+ raise errors.BackendError(f'Value name must start with {self.ngen.prefix}')
+ # compose and return statement
+ term = next(self.ngen)
+ return {(term, node.name)}, f'OPTIONAL{{ {head} <{node.predicate}> {term} }}'
+
+ def _this(self, node_type: bsc.Vertex, node: ast.fetch.This, head: str): # pylint: disable=unused-argument # (node_type)
+ if f'?{node.name}'.startswith(self.ngen.prefix):
+ raise errors.BackendError(f'This name must start with {self.ngen.prefix}')
+ # compose and return statement
+ return {(head, node.name)}, ''
+
+## EOF ##
diff --git a/bsfs/triple_store/sparql/parse_filter.py b/bsfs/triple_store/sparql/parse_filter.py
index 8b6b976..dca0aea 100644
--- a/bsfs/triple_store/sparql/parse_filter.py
+++ b/bsfs/triple_store/sparql/parse_filter.py
@@ -19,6 +19,7 @@ from bsfs.utils import URI, errors
# inner-module imports
from .distance import DISTANCE_FU
+from .utils import GenHopName, Query
# exports
__all__: typing.Sequence[str] = (
@@ -28,25 +29,6 @@ __all__: typing.Sequence[str] = (
## code ##
-class _GenHopName():
- """Generator that produces a new unique symbol name with each iteration."""
-
- # Symbol name prefix.
- prefix: str
-
- # Current counter.
- curr: int
-
- def __init__(self, prefix: str = '?hop', start: int = 0):
- self.prefix = prefix
- self.curr = start - 1
-
- def __next__(self):
- """Generate and return the next unique name."""
- self.curr += 1
- return self.prefix + str(self.curr)
-
-
class Filter():
"""Translate `bsfs.query.ast.filter` structures into Sparql queries."""
@@ -54,18 +36,18 @@ class Filter():
schema: bsc.Schema
# Generator that produces unique symbol names.
- ngen: _GenHopName
+ ngen: GenHopName
def __init__(self, graph, schema):
self.graph = graph
self.schema = schema
- self.ngen = _GenHopName()
+ self.ngen = GenHopName(prefix='?flt')
def __call__(
self,
root_type: bsc.Node,
root: typing.Optional[ast.filter.FilterExpression] = None,
- ) -> str:
+ ) -> Query:
"""
"""
# check root_type
@@ -79,15 +61,18 @@ class Filter():
else:
cond = self._parse_filter_expression(root_type, root, '?ent')
# assemble query
- return f'''
- SELECT ?ent
- WHERE {{
- ?ent <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{root_type.uri}> .
- {cond}
- }}
- '''
+ return Query(
+ root_type=root_type.uri,
+ root_head='?ent',
+ where=cond,
+ )
- def _parse_filter_expression(self, type_: bsc.Vertex, node: ast.filter.FilterExpression, head: str) -> str:
+ def _parse_filter_expression(
+ self,
+ type_: bsc.Vertex,
+ node: ast.filter.FilterExpression,
+ head: str,
+ ) -> str:
"""Route *node* to the handler of the respective FilterExpression subclass."""
if isinstance(node, ast.filter.Is):
return self._is(type_, node, head)
diff --git a/bsfs/triple_store/sparql/sparql.py b/bsfs/triple_store/sparql/sparql.py
index fedd227..a0dd12e 100644
--- a/bsfs/triple_store/sparql/sparql.py
+++ b/bsfs/triple_store/sparql/sparql.py
@@ -16,6 +16,7 @@ from bsfs.query import ast
from bsfs.utils import errors, URI
# inner-module imports
+from . import parse_fetch
from . import parse_filter
from .. import base
from .distance import DISTANCE_FU
@@ -92,13 +93,16 @@ class SparqlStore(base.TripleStoreBase):
# Filter parser
_filter_parser: parse_filter.Filter
+ # Fetch parser
+ _fetch_parser: parse_fetch.Fetch
+
def __init__(self):
super().__init__(None)
self._graph = rdflib.Graph()
self._transaction = _Transaction(self._graph)
- # NOTE: parsing bsfs.query.ast.filter.Has requires xsd:integer.
self._schema = bsc.Schema(literals={bsc.ROOT_NUMBER.child(ns.xsd.integer)})
self._filter_parser = parse_filter.Filter(self._graph, self._schema)
+ self._fetch_parser = parse_fetch.Fetch(self._schema)
# NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super)
# However, not having it here is clearer since it's explicit that there are no arguments.
@@ -197,17 +201,53 @@ class SparqlStore(base.TripleStoreBase):
# migrate schema
self._schema = schema
self._filter_parser.schema = schema
+ self._fetch_parser.schema = schema
+
+ def fetch(
+ self,
+ node_type: bsc.Node,
+ filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin
+ fetch: ast.fetch.FetchExpression,
+ ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]:
+ if node_type not in self.schema.nodes():
+ raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
+ if not isinstance(filter, ast.filter.FilterExpression):
+ raise TypeError(filter)
+ if not isinstance(fetch, ast.fetch.FetchExpression):
+ raise TypeError(fetch)
+ # compose a query from fetch and filter ast
+ query = self._filter_parser(node_type, filter)
+ query += self._fetch_parser(node_type, fetch)
+ # run query
+ emitted = set()
+ for result in query(self._graph):
+ guid = URI(result[0])
+ for name, raw in zip(query.names, result[1:]):
+ if raw is None: # undefined value
+ continue
+ if isinstance(raw, rdflib.Literal):
+ value = raw.value
+ else:
+ value = URI(raw)
+ # emit triple
+ triple = (guid, name, value)
+ if triple not in emitted: # FIXME: needs a better solution!
+ emitted.add(triple)
+ yield guid, name, value
def get(
self,
node_type: bsc.Node,
- query: typing.Optional[ast.filter.FilterExpression] = None,
+ filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin
) -> typing.Iterator[URI]:
if node_type not in self.schema.nodes():
raise errors.ConsistencyError(f'{node_type} is not defined in the schema')
- if not isinstance(query, ast.filter.FilterExpression):
- raise TypeError(query)
- for guid, in self._graph.query(self._filter_parser(node_type, query)):
+ if not isinstance(filter, ast.filter.FilterExpression):
+ raise TypeError(filter)
+ # compose query
+ query = self._filter_parser(node_type, filter)
+ # run query
+ for guid, in query(self._graph):
yield URI(guid)
def _has_type(self, subject: URI, node_type: bsc.Node) -> bool:
diff --git a/bsfs/triple_store/sparql/utils.py b/bsfs/triple_store/sparql/utils.py
new file mode 100644
index 0000000..deca4d8
--- /dev/null
+++ b/bsfs/triple_store/sparql/utils.py
@@ -0,0 +1,141 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+import typing
+
+# external imports
+import rdflib
+
+# bsfs imports
+from bsfs.namespace import ns
+from bsfs.utils import typename
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'GenHopName',
+ 'Query',
+ )
+
+
+## code ##
+
+class GenHopName():
+ """Generator that produces a new unique symbol name with each iteration."""
+
+ # Symbol name prefix.
+ prefix: str
+
+ # Current counter.
+ curr: int
+
+ def __init__(self, prefix: str = '?hop', start: int = 0):
+ self.prefix = prefix
+ self.curr = start - 1
+
+ def __next__(self):
+ """Generate and return the next unique name."""
+ self.curr += 1
+ return self.prefix + str(self.curr)
+
+
+class Query():
+ """Hold, manage, and complete partial Sparql queries."""
+
+ # root node type URI.
+ root_type: str
+
+ # root node variable name.
+ root_head: str
+
+ # (head, name) tuples (w/o root)
+ select: typing.Tuple[typing.Tuple[str, str], ...]
+
+ # where statements.
+ where: str
+
+ def __init__(
+ self,
+ root_type: str,
+ root_head: str = '?ent',
+ select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None,
+ where: typing.Optional[str] = None,
+ ):
+ # check arguments
+ if select is None:
+ select = []
+ if where is None:
+ where = ''
+ # set members
+ self.root_type = root_type
+ self.root_head = root_head
+ self.select = tuple(select) # tuple ensures presistent order
+ self.where = where.strip()
+
+ def __str__(self) -> str:
+ return self.query
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self.root_type == other.root_type \
+ and self.root_head == other.root_head \
+ and self.select == other.select \
+ and self.where == other.where
+
+ def __hash__(self) -> int:
+ return hash((type(self), self.root_type, self.root_head, self.select, self.where))
+
+ def __add__(self, other: typing.Any) -> 'Query':
+ # check other's type
+ if not isinstance(other, type(self)):
+ return NotImplemented
+ # check query compatibility
+ if not self.root_type == other.root_type:
+ raise ValueError(other)
+ if not self.root_head == other.root_head:
+ raise ValueError(other)
+ # combine selections
+ select = self.select + other.select
+ # combine conditions
+ conds = []
+ if self.where != '':
+ conds.append(self.where)
+ if other.where != '':
+ conds.append(other.where)
+ where = ' . '.join(conds)
+ # return new query
+ return Query(
+ root_type=self.root_type,
+ root_head=self.root_head,
+ select=select,
+ where=where,
+ )
+
+ @property
+ def names(self) -> typing.Tuple[str, ...]:
+ """Return a tuple of selected variable names, excluding the root."""
+ return tuple(name for _, name in self.select)
+
+ @property
+ def query(self) -> str:
+ """Return an executable sparql query."""
+ select = ' '.join(f'({head} as ?{name})' for head, name in self.select)
+ return f'''
+ SELECT {self.root_head} {select}
+ WHERE {{
+ {self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> .
+ {self.where}
+ }}
+ '''
+
+ def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result:
+ """Execute the query on a *graph* and return the query result."""
+ return graph.query(self.query)
+
+## EOF ##