""" Part of the bsie module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports from collections import defaultdict import logging import typing # bsie imports from bsie import base from bsie.utils.node import Node from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename from bsie.utils import bsfs, node, ns # exports __all__: typing.Sequence[str] = ( 'Pipeline', ) ## code ## logger = logging.getLogger(__name__) class Pipeline(): """Extraction pipeline to generate triples from files. The Pipeline binds readers and extractors, and performs the necessary operations to produce triples from a file. It takes a best-effort approach to extract as many triples as possible. Errors during the extraction are passed over and reported to the log. """ # combined extractor schemas. schema: _schema.Schema # node prefix. _prefix: URI # extractor -> reader mapping _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] def __init__( self, prefix: URI, ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] ): # store core members self._prefix = prefix self._ext2rdr = ext2rdr # compile schema from all extractors self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr) def __str__(self) -> str: return bsfs.typename(self) def __repr__(self) -> str: return f'{bsfs.typename(self)}(...)' def __hash__(self) -> int: return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ and self.schema == other.schema \ and self._prefix == other._prefix \ and self._ext2rdr == other._ext2rdr def predicates(self) -> typing.Iterator[_schema.Predicate]: """Return the predicates that are extracted from a file.""" return iter({pred for ext in self._ext2rdr for pred in ext.predicates()}) def __call__( self, path: URI, predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None, ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]: """Extract triples from the file at *path*. Optionally, limit triples to *predicates*.""" # get predicates predicates = set(predicates) if predicates is not None else set(self.schema.predicates()) # get extractors extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)} # corner-case short-cut if len(extractors) == 0: return # get readers -> extractors mapping rdr2ext = defaultdict(set) for ext in extractors: rdr = self._ext2rdr[ext] rdr2ext[rdr].add(ext) # create subject for file uuid = _uuid.UCID.from_path(path) subject = Node(ns.bsfs.Entity, self._prefix + uuid) # extract information for rdr, extrs in rdr2ext.items(): try: # get content content = rdr(path) if rdr is not None else None # apply extractors on this content for ext in extrs: try: # get predicate/value tuples for node, pred, value in ext.extract(subject, content, predicates): yield node, pred, value except base.errors.ExtractorError as err: # critical extractor failure. logger.error('%s failed to extract triples from content: %s', ext, err) except base.errors.ReaderError as err: # failed to read any content. skip. logger.error('%s failed to read content: %s', rdr, err) ## EOF ##