""" Part of the bsie module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # standard imports from collections import defaultdict import logging import typing # bsie imports from bsie.extractor import Extractor from bsie.reader import Reader from bsie.utils import bsfs, errors, node, ns # exports __all__: typing.Sequence[str] = ( 'Pipeline', ) # constants FILE_PREFIX = 'file#' ## code ## logger = logging.getLogger(__name__) class Pipeline(): """Extraction pipeline to generate triples from files. The Pipeline binds readers and extractors, and performs the necessary operations to produce triples from a file. It takes a best-effort approach to extract as many triples as possible. Errors during the extraction are passed over and reported to the log. """ # combined extractor schemas. _schema: bsfs.schema.Schema # node prefix. _prefix: bsfs.Namespace # extractor -> reader mapping _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] def __init__( self, prefix: bsfs.Namespace, ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] ): # store core members self._prefix = prefix + FILE_PREFIX self._ext2rdr = ext2rdr # compile schema from all extractors self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) def __str__(self) -> str: return bsfs.typename(self) def __repr__(self) -> str: return f'{bsfs.typename(self)}(...)' def __hash__(self) -> int: return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ and self._schema == other._schema \ and self._prefix == other._prefix \ and self._ext2rdr == other._ext2rdr @property def schema(self) -> bsfs.schema.Schema: """Return the pipeline's schema (combined from all extractors).""" return self._schema @property def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: """Return the principal predicates that can be extracted.""" return iter({pred for ext in self._ext2rdr for pred in ext.principals}) def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: """Return the subset of the schema that supports the given *principals*.""" # materialize principals principals = set(principals) # collect and combine schemas from extractors return bsfs.schema.Schema.Union({ ext.schema for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals) }) def __call__( self, path: bsfs.URI, principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" # get principals principals = set(principals) if principals is not None else set(self.schema.predicates()) # get extractors extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} # corner-case short-cut if len(extractors) == 0: return # get readers -> extractors mapping rdr2ext = defaultdict(set) for ext in extractors: rdr = self._ext2rdr[ext] rdr2ext[rdr].add(ext) # create subject for file uuid = bsfs.uuid.UCID.from_path(path) subject = node.Node(ns.bsfs.File, self._prefix[uuid]) # extract information for rdr, extrs in rdr2ext.items(): try: # get content content = rdr(path) if rdr is not None else None #logger.info('extracted %s from %s', rdr, path) # apply extractors on this content for ext in extrs: try: # get predicate/value tuples for subject, pred, value in ext.extract(subject, content, principals): yield subject, pred, value except errors.ExtractorError as err: # critical extractor failure. logger.error('%s failed to extract triples from content: %s', ext, err) except errors.UnsupportedFileFormatError: # failed to read the file format. skip. #logger.warning('%s could not process the file format of %s', rdr, err) pass except errors.ReaderError as err: # failed to read any content. skip. logger.error('%s failed to read content: %s', rdr, err) ## EOF ##