# imports import base64 import itertools import typing # external imports import rdflib # bsfs imports from bsfs import schema as bsc from bsfs.namespace import ns from bsfs.query import ast from bsfs.utils import errors, URI # inner-module imports from . import parse_fetch from . import parse_filter from .. import base from .distance import DISTANCE_FU # exports __all__: typing.Sequence[str] = ( 'SparqlStore', ) ## code ## rdflib.term.bind(ns.bsfs.BinaryBlob, bytes, constructor=base64.b64decode) class _Transaction(): """Lightweight rdflib transactions for in-memory databases.""" # graph instance. _graph: rdflib.Graph # current log of added triples. _added: typing.List[typing.Any] # current log of removed triples. _removed: typing.List[typing.Any] def __init__(self, graph: rdflib.Graph): self._graph = graph # initialize internal structures self.commit() def commit(self): """Commit temporary changes.""" self._added = [] self._removed = [] def rollback(self): """Undo changes since the last commit.""" for triple in self._added: self._graph.remove(triple) for triple in self._removed: self._graph.add(triple) def add(self, triple: typing.Any): """Add a triple to the graph.""" if triple not in self._graph: self._added.append(triple) self._graph.add(triple) def remove(self, triple: typing.Any): """Remove a triple from the graph.""" if triple in self._graph: self._removed.append(triple) self._graph.remove(triple) class SparqlStore(base.TripleStoreBase): """Sparql-based triple store. The sparql triple store uses a third-party backend (currently rdflib) to store triples and manages them via the Sparql query language. """ # The rdflib graph. _graph: rdflib.Graph # Current transaction. _transaction: _Transaction # The local schema. _schema: bsc.Schema # Filter parser _filter_parser: parse_filter.Filter # Fetch parser _fetch_parser: parse_fetch.Fetch def __init__(self): super().__init__(None) self._graph = rdflib.Graph() self._transaction = _Transaction(self._graph) self._schema = bsc.Schema(literals={bsc.ROOT_NUMBER.child(ns.xsd.integer)}) self._filter_parser = parse_filter.Filter(self._graph, self._schema) self._fetch_parser = parse_fetch.Fetch(self._schema) # NOTE: mypy and pylint complain about the **kwargs not being listed (contrasting super) # However, not having it here is clearer since it's explicit that there are no arguments. @classmethod def Open(cls) -> 'SparqlStore': # type: ignore [override] # pylint: disable=arguments-differ return cls() def commit(self): self._transaction.commit() def rollback(self): self._transaction.rollback() @property def schema(self) -> bsc.Schema: return self._schema @schema.setter def schema(self, schema: bsc.Schema): # check args: Schema instanace if not isinstance(schema, bsc.Schema): raise TypeError(schema) # check compatibility: No contradicting definitions if not self.schema.consistent_with(schema): raise errors.ConsistencyError(f'{schema} is inconsistent with {self.schema}') # check distance functions of features invalid = { (cand.uri, cand.distance) for cand in schema.literals() if isinstance(cand, bsc.Feature) and cand.distance not in DISTANCE_FU} if len(invalid) > 0: cand, dist = zip(*invalid) raise errors.UnsupportedError( f'unknown distance function {",".join(dist)} in feature {", ".join(cand)}') # commit the current transaction self.commit() # adjust instances: # nothing to do for added classes # delete instances of removed classes # get deleted classes sub = self.schema - schema for pred in sub.predicates: # remove predicate instances for src, trg in self._graph.subject_objects(rdflib.URIRef(pred.uri)): self._transaction.remove((src, rdflib.URIRef(pred.uri), trg)) # remove predicate definition if pred.parent is not None: # NOTE: there shouldn't be any predicate w/o parent self._transaction.remove(( rdflib.URIRef(pred.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(pred.parent.uri), )) # remove node instances for node in sub.nodes: # iterate through node instances for inst in self._graph.subjects(rdflib.RDF.type, rdflib.URIRef(node.uri)): # remove triples where the instance is in the object position for src, pred in self._graph.subject_predicates(inst): self._transaction.remove((src, pred, inst)) # remove triples where the instance is in the subject position for pred, trg in self._graph.predicate_objects(inst): self._transaction.remove((inst, pred, trg)) # remove instance self._transaction.remove((inst, rdflib.RDF.type, rdflib.URIRef(node.uri))) # remove node definition if node.parent is not None: # NOTE: there shouldn't be any node w/o parent self._transaction.remove(( rdflib.URIRef(node.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(node.parent.uri), )) for lit in sub.literals: # remove literal definition if lit.parent is not None: # NOTE: there shouldn't be any literal w/o parent self._transaction.remove(( rdflib.URIRef(lit.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(lit.parent.uri), )) # add predicate, node, and literal hierarchies to the graph for itm in itertools.chain(schema.predicates(), schema.nodes(), schema.literals()): if itm.parent is not None: self._transaction.add((rdflib.URIRef(itm.uri), rdflib.RDFS.subClassOf, rdflib.URIRef(itm.parent.uri))) # commit instance changes self.commit() # migrate schema self._schema = schema self._filter_parser.schema = schema self._fetch_parser.schema = schema def fetch( self, node_type: bsc.Node, filter: ast.filter.FilterExpression, # pylint: disable=redefined-builtin fetch: ast.fetch.FetchExpression, ) -> typing.Iterator[typing.Tuple[URI, str, typing.Any]]: if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') if not isinstance(filter, ast.filter.FilterExpression): raise TypeError(filter) if not isinstance(fetch, ast.fetch.FetchExpression): raise TypeError(fetch) # compose a query from fetch and filter ast query = self._filter_parser(node_type, filter) query += self._fetch_parser(node_type, fetch) # run query emitted = set() for result in query(self._graph): guid = URI(result[0]) for name, raw in zip(query.names, result[1:]): if raw is None: # undefined value continue if isinstance(raw, rdflib.Literal): value = raw.value else: value = URI(raw) # emit triple triple = (guid, name, value) if triple not in emitted: # FIXME: needs a better solution! emitted.add(triple) yield guid, name, value def get( self, node_type: bsc.Node, filter: typing.Optional[ast.filter.FilterExpression] = None, # pylint: disable=redefined-builtin ) -> typing.Iterator[URI]: if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') if filter is not None and not isinstance(filter, ast.filter.FilterExpression): raise TypeError(filter) # compose query query = self._filter_parser(node_type, filter) # run query for guid, in query(self._graph): yield URI(guid) def _has_type(self, subject: URI, node_type: bsc.Node) -> bool: """Return True if *subject* is a node of class *node_type* or a subclass thereof.""" if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') subject_types = list(self._graph.objects(rdflib.URIRef(subject), rdflib.RDF.type)) if len(subject_types) == 0: return False if len(subject_types) == 1: node = self.schema.node(URI(subject_types[0])) # type: ignore [arg-type] # URI is a subtype of str if node == node_type: return True if node_type in node.parents(): return True return False raise errors.UnreachableError() def exists( self, node_type: bsc.Node, guids: typing.Iterable[URI], ) -> typing.Iterable[URI]: return (subj for subj in guids if self._has_type(subj, node_type)) def create( self, node_type: bsc.Node, guids: typing.Iterable[URI], ): # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check and create guids for guid in guids: subject = rdflib.URIRef(URI(guid)) # check node existence if (subject, rdflib.RDF.type, None) in self._graph: # FIXME: node exists and may have a different type! ignore? raise? report? continue # add node self._transaction.add((subject, rdflib.RDF.type, rdflib.URIRef(node_type.uri))) def set( self, node_type: bsc.Node, guids: typing.Iterable[URI], predicate: bsc.Predicate, values: typing.Iterable[typing.Any], ): # check node_type if node_type not in self.schema.nodes(): raise errors.ConsistencyError(f'{node_type} is not defined in the schema') # check predicate if predicate not in self.schema.predicates(): raise errors.ConsistencyError(f'{predicate} is not defined in the schema') if not node_type <= predicate.domain: raise errors.ConsistencyError(f'{node_type} must be a subclass of {predicate.domain}') # NOTE: predicate.range is in the schema since predicate is in the schema. # materialize values values = set(values) # check values if len(values) == 0: return if predicate.unique and len(values) != 1: raise ValueError(values) if isinstance(predicate.range, bsc.Node): values = set(values) # materialize to safeguard against iterators passed as argument inconsistent = {val for val in values if not self._has_type(val, predicate.range)} # catches nodes that don't exist and nodes that have an inconsistent type if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) # check guids # FIXME: Fail or skip inexistent nodes? guids = {URI(guid) for guid in guids} inconsistent = {guid for guid in guids if not self._has_type(guid, node_type)} if len(inconsistent) > 0: raise errors.InstanceError(inconsistent) # add triples pred = rdflib.URIRef(predicate.uri) for guid, value in itertools.product(guids, values): guid = rdflib.URIRef(guid) # convert value if isinstance(predicate.range, bsc.Literal): dtype = rdflib.URIRef(predicate.range.uri) if predicate.range <= self.schema.literal(ns.bsfs.BinaryBlob): dtype = rdflib.URIRef(ns.bsfs.BinaryBlob) value = base64.b64encode(value) value = rdflib.Literal(value, datatype=dtype) elif isinstance(predicate.range, bsc.Node): value = rdflib.URIRef(value) else: raise errors.UnreachableError() # clear triples for unique predicates if predicate.unique: for obj in self._graph.objects(guid, pred): if obj != value: self._transaction.remove((guid, pred, obj)) # add triple self._transaction.add((guid, pred, value)) ## EOF ##