aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/lib/pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsie/lib/pipeline.py')
-rw-r--r--bsie/lib/pipeline.py138
1 files changed, 138 insertions, 0 deletions
diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py
new file mode 100644
index 0000000..30fd6fd
--- /dev/null
+++ b/bsie/lib/pipeline.py
@@ -0,0 +1,138 @@
+
+# standard imports
+from collections import defaultdict
+import logging
+import typing
+
+# bsie imports
+from bsie.extractor import Extractor
+from bsie.reader import Reader
+from bsie.utils import bsfs, errors, node, ns
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Pipeline',
+ )
+
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+class Pipeline():
+ """Extraction pipeline to generate triples from files.
+
+ The Pipeline binds readers and extractors, and performs
+ the necessary operations to produce triples from a file.
+ It takes a best-effort approach to extract as many triples
+ as possible. Errors during the extraction are passed over
+ and reported to the log.
+
+ """
+
+ # combined extractor schemas.
+ _schema: bsfs.schema.Schema
+
+ # extractor -> reader mapping
+ _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]]
+
+ def __init__(
+ self,
+ ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]]
+ ):
+ # store core members
+ self._ext2rdr = ext2rdr
+ # compile schema from all extractors
+ self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr)
+
+ def __str__(self) -> str:
+ return bsfs.typename(self)
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}(...)'
+
+ def __hash__(self) -> int:
+ return hash((type(self), self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self._schema == other._schema \
+ and self._ext2rdr == other._ext2rdr
+
+ @property
+ def schema(self) -> bsfs.schema.Schema:
+ """Return the pipeline's schema (combined from all extractors)."""
+ return self._schema
+
+ @property
+ def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
+ """Return the principal predicates that can be extracted."""
+ return iter({pred for ext in self._ext2rdr for pred in ext.principals})
+
+ def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema:
+ """Return the subset of the schema that supports the given *principals*."""
+ # materialize principals
+ principals = set(principals)
+ # collect and combine schemas from extractors
+ return bsfs.schema.Schema.Union({
+ ext.schema
+ for ext
+ in self._ext2rdr
+ if not set(ext.principals).isdisjoint(principals)
+ })
+
+ def __call__(
+ self,
+ path: bsfs.URI,
+ principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None,
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ """Extract triples from the file at *path*. Optionally, limit triples to *principals*."""
+ # get principals
+ principals = set(principals) if principals is not None else set(self.schema.predicates())
+
+ # get extractors
+ extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)}
+
+ # corner-case short-cut
+ if len(extractors) == 0:
+ return
+
+ # get readers -> extractors mapping
+ rdr2ext = defaultdict(set)
+ for ext in extractors:
+ rdr = self._ext2rdr[ext]
+ rdr2ext[rdr].add(ext)
+
+ # create subject for file
+ subject = node.Node(ns.bsn.Entity,
+ ucid=bsfs.uuid.UCID.from_path(path),
+ )
+
+ # extract information
+ for rdr, extrs in rdr2ext.items():
+ try:
+ # get content
+ content = rdr(path) if rdr is not None else None
+ #logger.info('extracted %s from %s', rdr, path)
+
+ # apply extractors on this content
+ for ext in extrs:
+ try:
+ # get predicate/value tuples
+ yield from ext.extract(subject, content, principals)
+
+ except errors.ExtractorError as err:
+ # critical extractor failure.
+ logger.error('%s failed to extract triples from content: %s', ext, err)
+
+ except errors.UnsupportedFileFormatError:
+ # failed to read the file format. skip.
+ #logger.warning('%s could not process the file format of %s', rdr, err)
+ pass
+
+ except errors.ReaderError as err:
+ # failed to read any content. skip.
+ logger.error('%s failed to read content: %s', rdr, err)
+
+
+## EOF ##