From 266c2c9a072bf3289fd7f2d75278b7d59528378c Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sat, 24 Dec 2022 10:27:09 +0100 Subject: package restructuring: base * Reader and Extractor to respective reader/extractor modules * ReaderBuilder to reader module * ExtractorBuilder to extractor module * Loading module in utils (safe_load, unpack_name) * Pipeline and PipelineBuilder to lib module * errors to utils * documentation: "standard import" and "external import" --- bsie/lib/pipeline.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 bsie/lib/pipeline.py (limited to 'bsie/lib/pipeline.py') diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py new file mode 100644 index 0000000..e5ce1b7 --- /dev/null +++ b/bsie/lib/pipeline.py @@ -0,0 +1,145 @@ +""" + +Part of the bsie module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +from collections import defaultdict +import logging +import typing + +# bsie imports +from bsie.extractor import Extractor +from bsie.reader import Reader +from bsie.utils import bsfs, errors, node, ns + +# exports +__all__: typing.Sequence[str] = ( + 'Pipeline', + ) + +# constants +FILE_PREFIX = 'file#' + +## code ## + +logger = logging.getLogger(__name__) + +class Pipeline(): + """Extraction pipeline to generate triples from files. + + The Pipeline binds readers and extractors, and performs + the necessary operations to produce triples from a file. + It takes a best-effort approach to extract as many triples + as possible. Errors during the extraction are passed over + and reported to the log. + + """ + + # combined extractor schemas. + _schema: bsfs.schema.Schema + + # node prefix. + _prefix: bsfs.Namespace + + # extractor -> reader mapping + _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] + + def __init__( + self, + prefix: bsfs.Namespace, + ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] + ): + # store core members + self._prefix = prefix + FILE_PREFIX + self._ext2rdr = ext2rdr + # compile schema from all extractors + self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) + + def __str__(self) -> str: + return bsfs.typename(self) + + def __repr__(self) -> str: + return f'{bsfs.typename(self)}(...)' + + def __hash__(self) -> int: + return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self._schema == other._schema \ + and self._prefix == other._prefix \ + and self._ext2rdr == other._ext2rdr + + @property + def schema(self) -> bsfs.schema.Schema: + """Return the pipeline's schema (combined from all extractors).""" + return self._schema + + @property + def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: + """Return the principal predicates that can be extracted.""" + return iter({pred for ext in self._ext2rdr for pred in ext.principals}) + + def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: + """Return the subset of the schema that supports the given *principals*.""" + # materialize principals + principals = set(principals) + # collect and combine schemas from extractors + return bsfs.schema.Schema.Union({ + ext.schema + for ext + in self._ext2rdr + if not set(ext.principals).isdisjoint(principals) + }) + + def __call__( + self, + path: bsfs.URI, + principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" + # get principals + principals = set(principals) if principals is not None else set(self.schema.predicates()) + + # get extractors + extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} + + # corner-case short-cut + if len(extractors) == 0: + return + + # get readers -> extractors mapping + rdr2ext = defaultdict(set) + for ext in extractors: + rdr = self._ext2rdr[ext] + rdr2ext[rdr].add(ext) + + # create subject for file + uuid = bsfs.uuid.UCID.from_path(path) + subject = node.Node(ns.bsfs.File, self._prefix[uuid]) + + # extract information + for rdr, extrs in rdr2ext.items(): + try: + # get content + content = rdr(path) if rdr is not None else None + + # apply extractors on this content + for ext in extrs: + try: + # get predicate/value tuples + for subject, pred, value in ext.extract(subject, content, principals): + yield subject, pred, value + + except errors.ExtractorError as err: + # critical extractor failure. + logger.error('%s failed to extract triples from content: %s', ext, err) + + except errors.ReaderError as err: + # failed to read any content. skip. + logger.error('%s failed to read content: %s', rdr, err) + + +## EOF ## -- cgit v1.2.3 From afd165000c1661a9cca117a4844ad3f89d926fdb Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Mon, 16 Jan 2023 20:53:39 +0100 Subject: unsupported file format exception --- bsie/lib/pipeline.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'bsie/lib/pipeline.py') diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py index e5ce1b7..02119bc 100644 --- a/bsie/lib/pipeline.py +++ b/bsie/lib/pipeline.py @@ -126,6 +126,8 @@ class Pipeline(): # get content content = rdr(path) if rdr is not None else None + #logger.info('extracted %s from %s', rdr, path) + # apply extractors on this content for ext in extrs: try: @@ -137,6 +139,11 @@ class Pipeline(): # critical extractor failure. logger.error('%s failed to extract triples from content: %s', ext, err) + except errors.UnsupportedFileFormatError as err: + # failed to read the file format. skip. + #logger.warning('%s could not process the file format of %s', rdr, err) + pass + except errors.ReaderError as err: # failed to read any content. skip. logger.error('%s failed to read content: %s', rdr, err) -- cgit v1.2.3 From 58aaa864f9747d27c065739256d4c6635ca9b751 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Mon, 16 Jan 2023 21:36:50 +0100 Subject: minor fixes --- bsie/lib/pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'bsie/lib/pipeline.py') diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py index 02119bc..44685ba 100644 --- a/bsie/lib/pipeline.py +++ b/bsie/lib/pipeline.py @@ -125,7 +125,6 @@ class Pipeline(): try: # get content content = rdr(path) if rdr is not None else None - #logger.info('extracted %s from %s', rdr, path) # apply extractors on this content @@ -139,7 +138,7 @@ class Pipeline(): # critical extractor failure. logger.error('%s failed to extract triples from content: %s', ext, err) - except errors.UnsupportedFileFormatError as err: + except errors.UnsupportedFileFormatError: # failed to read the file format. skip. #logger.warning('%s could not process the file format of %s', rdr, err) pass -- cgit v1.2.3 From 9c26a5ef759b010d8cf4384b0515cc188b885d81 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Wed, 8 Feb 2023 17:44:00 +0100 Subject: node naming policy --- bsie/lib/pipeline.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) (limited to 'bsie/lib/pipeline.py') diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py index 44685ba..0bc5109 100644 --- a/bsie/lib/pipeline.py +++ b/bsie/lib/pipeline.py @@ -19,8 +19,6 @@ __all__: typing.Sequence[str] = ( 'Pipeline', ) -# constants -FILE_PREFIX = 'file#' ## code ## @@ -40,19 +38,14 @@ class Pipeline(): # combined extractor schemas. _schema: bsfs.schema.Schema - # node prefix. - _prefix: bsfs.Namespace - # extractor -> reader mapping _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] def __init__( self, - prefix: bsfs.Namespace, ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] ): # store core members - self._prefix = prefix + FILE_PREFIX self._ext2rdr = ext2rdr # compile schema from all extractors self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) @@ -64,12 +57,11 @@ class Pipeline(): return f'{bsfs.typename(self)}(...)' def __hash__(self) -> int: - return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + return hash((type(self), self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ and self._schema == other._schema \ - and self._prefix == other._prefix \ and self._ext2rdr == other._ext2rdr @property @@ -117,8 +109,9 @@ class Pipeline(): rdr2ext[rdr].add(ext) # create subject for file - uuid = bsfs.uuid.UCID.from_path(path) - subject = node.Node(ns.bsfs.File, self._prefix[uuid]) + subject = node.Node(ns.bsfs.File, + ucid=bsfs.uuid.UCID.from_path(path), + ) # extract information for rdr, extrs in rdr2ext.items(): @@ -131,8 +124,7 @@ class Pipeline(): for ext in extrs: try: # get predicate/value tuples - for subject, pred, value in ext.extract(subject, content, principals): - yield subject, pred, value + yield from ext.extract(subject, content, principals) except errors.ExtractorError as err: # critical extractor failure. -- cgit v1.2.3 From 4b5c4d486bb4f0f4da2e25ad464e8336a781cdcb Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Wed, 1 Mar 2023 22:31:03 +0100 Subject: removed module header stubs --- bsie/lib/pipeline.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'bsie/lib/pipeline.py') diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py index 0bc5109..128eecc 100644 --- a/bsie/lib/pipeline.py +++ b/bsie/lib/pipeline.py @@ -1,9 +1,4 @@ -""" -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" # standard imports from collections import defaultdict import logging -- cgit v1.2.3 From d2052e77210e0ace2c5f06e48afe2a8acb412965 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sat, 4 Mar 2023 13:41:13 +0100 Subject: namespace refactoring and cleanup --- bsie/lib/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'bsie/lib/pipeline.py') diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py index 128eecc..30fd6fd 100644 --- a/bsie/lib/pipeline.py +++ b/bsie/lib/pipeline.py @@ -104,7 +104,7 @@ class Pipeline(): rdr2ext[rdr].add(ext) # create subject for file - subject = node.Node(ns.bsfs.File, + subject = node.Node(ns.bsn.Entity, ucid=bsfs.uuid.UCID.from_path(path), ) -- cgit v1.2.3