aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/tools/pipeline.py
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2022-11-25 14:59:17 +0100
committerMatthias Baumgartner <dev@igsor.net>2022-11-25 14:59:17 +0100
commita294bbe0622911bcd6df37c38865a4c0eb290593 (patch)
treef038ed8d4f04c63991939e13e61ae170de4e2c57 /bsie/tools/pipeline.py
parent9389c741bdbbca9adbff6099d440706cd63deac4 (diff)
parent3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7 (diff)
downloadbsie-a294bbe0622911bcd6df37c38865a4c0eb290593.tar.gz
bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.tar.bz2
bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.zip
Merge branch 'mb/tools' into develop
Diffstat (limited to 'bsie/tools/pipeline.py')
-rw-r--r--bsie/tools/pipeline.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py
new file mode 100644
index 0000000..8e1c992
--- /dev/null
+++ b/bsie/tools/pipeline.py
@@ -0,0 +1,121 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+from collections import defaultdict
+import logging
+import typing
+
+# bsie imports
+from bsie import base
+from bsie.utils import ns
+from bsie.utils.node import Node
+from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Pipeline',
+ )
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+class Pipeline():
+ """Extraction pipeline to generate triples from files.
+
+ The Pipeline binds readers and extractors, and performs
+ the necessary operations to produce triples from a file.
+ It takes a best-effort approach to extract as many triples
+ as possible. Errors during the extraction are passed over
+ and reported to the log.
+
+ """
+
+ # combined extractor schemas.
+ schema: _schema.Schema
+
+ # node prefix.
+ _prefix: URI
+
+ # extractor -> reader mapping
+ _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
+
+ def __init__(
+ self,
+ prefix: URI,
+ ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
+ ):
+ # store core members
+ self._prefix = prefix
+ self._ext2rdr = ext2rdr
+ # compile schema from all extractors
+ self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr)
+
+ def __str__(self) -> str:
+ return typename(self)
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}(...)'
+
+ def __hash__(self) -> int:
+ return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self.schema == other.schema \
+ and self._prefix == other._prefix \
+ and self._ext2rdr == other._ext2rdr
+
+ def __call__(
+ self,
+ path: URI,
+ predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None,
+ ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
+ """Extract triples from the file at *path*. Optionally, limit triples to *predicates*."""
+ # get predicates
+ predicates = set(predicates) if predicates is not None else set(self.schema.predicates())
+
+ # get extractors
+ extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)}
+
+ # corner-case short-cut
+ if len(extractors) == 0:
+ return
+
+ # get readers -> extractors mapping
+ rdr2ext = defaultdict(set)
+ for ext in extractors:
+ rdr = self._ext2rdr[ext]
+ rdr2ext[rdr].add(ext)
+
+ # create subject for file
+ uuid = _uuid.UCID.from_path(path)
+ subject = Node(ns.bsfs.Entity, self._prefix + uuid)
+
+ # extract information
+ for rdr, extrs in rdr2ext.items():
+ try:
+ # get content
+ content = rdr(path) if rdr is not None else None
+
+ # apply extractors on this content
+ for ext in extrs:
+ try:
+ # get predicate/value tuples
+ for node, pred, value in ext.extract(subject, content, predicates):
+ yield node, pred, value
+
+ except base.errors.ExtractorError as err:
+ # critical extractor failure.
+ logger.error('%s failed to extract triples from content: %s', ext, err)
+
+ except base.errors.ReaderError as err:
+ # failed to read any content. skip.
+ logger.error('%s failed to read content: %s', rdr, err)
+
+
+## EOF ##