# standard imports import typing # external imports import rdflib # bsfs imports from bsfs.namespace import ns from bsfs.utils import typename # exports __all__: typing.Sequence[str] = ( 'GenHopName', 'Query', ) ## code ## class GenHopName(): """Generator that produces a new unique symbol name with each iteration.""" # Symbol name prefix. prefix: str # Current counter. curr: int def __init__(self, prefix: str = '?hop', start: int = 0): self.prefix = prefix self.curr = start - 1 def __next__(self): """Generate and return the next unique name.""" self.curr += 1 return self.prefix + str(self.curr) class Query(): """Hold, manage, and complete partial Sparql queries.""" # root node type URI. root_type: str # root node variable name. root_head: str # (head, name) tuples (w/o root) select: typing.Tuple[typing.Tuple[str, str], ...] # where statements. where: str def __init__( self, root_type: str, root_head: str = '?ent', select: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None, where: typing.Optional[str] = None, ): # check arguments if select is None: select = [] if where is None: where = '' # set members self.root_type = root_type self.root_head = root_head self.select = tuple(select) # tuple ensures presistent order self.where = where.strip() def __str__(self) -> str: return self.query def __repr__(self) -> str: return f'{typename(self)}({self.root_type}, {self.root_head}, {self.select}, {self.where})' def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ and self.root_type == other.root_type \ and self.root_head == other.root_head \ and self.select == other.select \ and self.where == other.where def __hash__(self) -> int: return hash((type(self), self.root_type, self.root_head, self.select, self.where)) def __add__(self, other: typing.Any) -> 'Query': # check other's type if not isinstance(other, type(self)): return NotImplemented # check query compatibility if not self.root_type == other.root_type: raise ValueError(other) if not self.root_head == other.root_head: raise ValueError(other) # combine selections select = self.select + other.select # combine conditions conds = [] if self.where != '': conds.append(self.where) if other.where != '': conds.append(other.where) where = ' . '.join(conds) # return new query return Query( root_type=self.root_type, root_head=self.root_head, select=select, where=where, ) @property def names(self) -> typing.Tuple[str, ...]: """Return a tuple of selected variable names, excluding the root.""" return tuple(name for _, name in self.select) @property def query(self) -> str: """Return an executable sparql query.""" select = ' '.join(f'({head} as ?{name})' for head, name in self.select) return f''' SELECT DISTINCT {self.root_head} {select} WHERE {{ {self.root_head} <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <{self.root_type}> . {self.where} }} ORDER BY str({self.root_head}) ''' def __call__(self, graph: rdflib.Graph) -> rdflib.query.Result: """Execute the query on a *graph* and return the query result.""" return graph.query(self.query) ## EOF ##