diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
commit | a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch) | |
tree | fb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/lib/pipeline.py | |
parent | 7582c280ad5324a2f0427999911c7e7abc14a6ab (diff) | |
parent | af81318ae9311fd0b0e16949cef3cfaf7996970b (diff) | |
download | bsie-main.tar.gz bsie-main.tar.bz2 bsie-main.zip |
Diffstat (limited to 'bsie/lib/pipeline.py')
-rw-r--r-- | bsie/lib/pipeline.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py new file mode 100644 index 0000000..30fd6fd --- /dev/null +++ b/bsie/lib/pipeline.py @@ -0,0 +1,138 @@ + +# standard imports +from collections import defaultdict +import logging +import typing + +# bsie imports +from bsie.extractor import Extractor +from bsie.reader import Reader +from bsie.utils import bsfs, errors, node, ns + +# exports +__all__: typing.Sequence[str] = ( + 'Pipeline', + ) + + +## code ## + +logger = logging.getLogger(__name__) + +class Pipeline(): + """Extraction pipeline to generate triples from files. + + The Pipeline binds readers and extractors, and performs + the necessary operations to produce triples from a file. + It takes a best-effort approach to extract as many triples + as possible. Errors during the extraction are passed over + and reported to the log. + + """ + + # combined extractor schemas. + _schema: bsfs.schema.Schema + + # extractor -> reader mapping + _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] + + def __init__( + self, + ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] + ): + # store core members + self._ext2rdr = ext2rdr + # compile schema from all extractors + self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) + + def __str__(self) -> str: + return bsfs.typename(self) + + def __repr__(self) -> str: + return f'{bsfs.typename(self)}(...)' + + def __hash__(self) -> int: + return hash((type(self), self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self._schema == other._schema \ + and self._ext2rdr == other._ext2rdr + + @property + def schema(self) -> bsfs.schema.Schema: + """Return the pipeline's schema (combined from all extractors).""" + return self._schema + + @property + def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: + """Return the principal predicates that can be extracted.""" + return iter({pred for ext in self._ext2rdr for pred in ext.principals}) + + def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: + """Return the subset of the schema that supports the given *principals*.""" + # materialize principals + principals = set(principals) + # collect and combine schemas from extractors + return bsfs.schema.Schema.Union({ + ext.schema + for ext + in self._ext2rdr + if not set(ext.principals).isdisjoint(principals) + }) + + def __call__( + self, + path: bsfs.URI, + principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" + # get principals + principals = set(principals) if principals is not None else set(self.schema.predicates()) + + # get extractors + extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} + + # corner-case short-cut + if len(extractors) == 0: + return + + # get readers -> extractors mapping + rdr2ext = defaultdict(set) + for ext in extractors: + rdr = self._ext2rdr[ext] + rdr2ext[rdr].add(ext) + + # create subject for file + subject = node.Node(ns.bsn.Entity, + ucid=bsfs.uuid.UCID.from_path(path), + ) + + # extract information + for rdr, extrs in rdr2ext.items(): + try: + # get content + content = rdr(path) if rdr is not None else None + #logger.info('extracted %s from %s', rdr, path) + + # apply extractors on this content + for ext in extrs: + try: + # get predicate/value tuples + yield from ext.extract(subject, content, principals) + + except errors.ExtractorError as err: + # critical extractor failure. + logger.error('%s failed to extract triples from content: %s', ext, err) + + except errors.UnsupportedFileFormatError: + # failed to read the file format. skip. + #logger.warning('%s could not process the file format of %s', rdr, err) + pass + + except errors.ReaderError as err: + # failed to read any content. skip. + logger.error('%s failed to read content: %s', rdr, err) + + +## EOF ## |