diff options
author | Matthias Baumgartner <dev@igsor.net> | 2022-11-25 14:59:17 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2022-11-25 14:59:17 +0100 |
commit | a294bbe0622911bcd6df37c38865a4c0eb290593 (patch) | |
tree | f038ed8d4f04c63991939e13e61ae170de4e2c57 /bsie/tools/pipeline.py | |
parent | 9389c741bdbbca9adbff6099d440706cd63deac4 (diff) | |
parent | 3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7 (diff) | |
download | bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.tar.gz bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.tar.bz2 bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.zip |
Merge branch 'mb/tools' into develop
Diffstat (limited to 'bsie/tools/pipeline.py')
-rw-r--r-- | bsie/tools/pipeline.py | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py new file mode 100644 index 0000000..8e1c992 --- /dev/null +++ b/bsie/tools/pipeline.py @@ -0,0 +1,121 @@ +""" + +Part of the bsie module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +from collections import defaultdict +import logging +import typing + +# bsie imports +from bsie import base +from bsie.utils import ns +from bsie.utils.node import Node +from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename + +# exports +__all__: typing.Sequence[str] = ( + 'Pipeline', + ) + +## code ## + +logger = logging.getLogger(__name__) + +class Pipeline(): + """Extraction pipeline to generate triples from files. + + The Pipeline binds readers and extractors, and performs + the necessary operations to produce triples from a file. + It takes a best-effort approach to extract as many triples + as possible. Errors during the extraction are passed over + and reported to the log. + + """ + + # combined extractor schemas. + schema: _schema.Schema + + # node prefix. + _prefix: URI + + # extractor -> reader mapping + _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] + + def __init__( + self, + prefix: URI, + ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] + ): + # store core members + self._prefix = prefix + self._ext2rdr = ext2rdr + # compile schema from all extractors + self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr) + + def __str__(self) -> str: + return typename(self) + + def __repr__(self) -> str: + return f'{typename(self)}(...)' + + def __hash__(self) -> int: + return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self.schema == other.schema \ + and self._prefix == other._prefix \ + and self._ext2rdr == other._ext2rdr + + def __call__( + self, + path: URI, + predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *predicates*.""" + # get predicates + predicates = set(predicates) if predicates is not None else set(self.schema.predicates()) + + # get extractors + extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)} + + # corner-case short-cut + if len(extractors) == 0: + return + + # get readers -> extractors mapping + rdr2ext = defaultdict(set) + for ext in extractors: + rdr = self._ext2rdr[ext] + rdr2ext[rdr].add(ext) + + # create subject for file + uuid = _uuid.UCID.from_path(path) + subject = Node(ns.bsfs.Entity, self._prefix + uuid) + + # extract information + for rdr, extrs in rdr2ext.items(): + try: + # get content + content = rdr(path) if rdr is not None else None + + # apply extractors on this content + for ext in extrs: + try: + # get predicate/value tuples + for node, pred, value in ext.extract(subject, content, predicates): + yield node, pred, value + + except base.errors.ExtractorError as err: + # critical extractor failure. + logger.error('%s failed to extract triples from content: %s', ext, err) + + except base.errors.ReaderError as err: + # failed to read any content. skip. + logger.error('%s failed to read content: %s', rdr, err) + + +## EOF ## |