diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
commit | a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch) | |
tree | fb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/tools/pipeline.py | |
parent | 7582c280ad5324a2f0427999911c7e7abc14a6ab (diff) | |
parent | af81318ae9311fd0b0e16949cef3cfaf7996970b (diff) | |
download | bsie-release.tar.gz bsie-release.tar.bz2 bsie-release.zip |
Diffstat (limited to 'bsie/tools/pipeline.py')
-rw-r--r-- | bsie/tools/pipeline.py | 144 |
1 files changed, 0 insertions, 144 deletions
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py deleted file mode 100644 index 20e8ddf..0000000 --- a/bsie/tools/pipeline.py +++ /dev/null @@ -1,144 +0,0 @@ -""" - -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports -from collections import defaultdict -import logging -import typing - -# bsie imports -from bsie import base -from bsie.utils import bsfs, node, ns - -# exports -__all__: typing.Sequence[str] = ( - 'Pipeline', - ) - -# constants -FILE_PREFIX = 'file#' - -## code ## - -logger = logging.getLogger(__name__) - -class Pipeline(): - """Extraction pipeline to generate triples from files. - - The Pipeline binds readers and extractors, and performs - the necessary operations to produce triples from a file. - It takes a best-effort approach to extract as many triples - as possible. Errors during the extraction are passed over - and reported to the log. - - """ - - # combined extractor schemas. - _schema: bsfs.schema.Schema - - # node prefix. - _prefix: bsfs.Namespace - - # extractor -> reader mapping - _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] - - def __init__( - self, - prefix: bsfs.Namespace, - ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] - ): - # store core members - self._prefix = prefix + FILE_PREFIX - self._ext2rdr = ext2rdr - # compile schema from all extractors - self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) - - def __str__(self) -> str: - return bsfs.typename(self) - - def __repr__(self) -> str: - return f'{bsfs.typename(self)}(...)' - - def __hash__(self) -> int: - return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) - - def __eq__(self, other: typing.Any) -> bool: - return isinstance(other, type(self)) \ - and self._schema == other._schema \ - and self._prefix == other._prefix \ - and self._ext2rdr == other._ext2rdr - - @property - def schema(self) -> bsfs.schema.Schema: - """Return the pipeline's schema (combined from all extractors).""" - return self._schema - - @property - def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: - """Return the principal predicates that can be extracted.""" - return iter({pred for ext in self._ext2rdr for pred in ext.principals}) - - def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: - """Return the subset of the schema that supports the given *principals*.""" - # materialize principals - principals = set(principals) - # collect and combine schemas from extractors - return bsfs.schema.Schema.Union({ - ext.schema - for ext - in self._ext2rdr - if not set(ext.principals).isdisjoint(principals) - }) - - def __call__( - self, - path: bsfs.URI, - principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, - ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: - """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" - # get principals - principals = set(principals) if principals is not None else set(self.schema.predicates()) - - # get extractors - extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} - - # corner-case short-cut - if len(extractors) == 0: - return - - # get readers -> extractors mapping - rdr2ext = defaultdict(set) - for ext in extractors: - rdr = self._ext2rdr[ext] - rdr2ext[rdr].add(ext) - - # create subject for file - uuid = bsfs.uuid.UCID.from_path(path) - subject = node.Node(ns.bsfs.File, self._prefix[uuid]) - - # extract information - for rdr, extrs in rdr2ext.items(): - try: - # get content - content = rdr(path) if rdr is not None else None - - # apply extractors on this content - for ext in extrs: - try: - # get predicate/value tuples - for subject, pred, value in ext.extract(subject, content, principals): - yield subject, pred, value - - except base.errors.ExtractorError as err: - # critical extractor failure. - logger.error('%s failed to extract triples from content: %s', ext, err) - - except base.errors.ReaderError as err: - # failed to read any content. skip. - logger.error('%s failed to read content: %s', rdr, err) - - -## EOF ## |