From 3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Fri, 25 Nov 2022 14:43:12 +0100 Subject: builders and pipeline --- bsie/tools/pipeline.py | 121 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 bsie/tools/pipeline.py (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py new file mode 100644 index 0000000..8e1c992 --- /dev/null +++ b/bsie/tools/pipeline.py @@ -0,0 +1,121 @@ +""" + +Part of the bsie module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +from collections import defaultdict +import logging +import typing + +# bsie imports +from bsie import base +from bsie.utils import ns +from bsie.utils.node import Node +from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename + +# exports +__all__: typing.Sequence[str] = ( + 'Pipeline', + ) + +## code ## + +logger = logging.getLogger(__name__) + +class Pipeline(): + """Extraction pipeline to generate triples from files. + + The Pipeline binds readers and extractors, and performs + the necessary operations to produce triples from a file. + It takes a best-effort approach to extract as many triples + as possible. Errors during the extraction are passed over + and reported to the log. + + """ + + # combined extractor schemas. + schema: _schema.Schema + + # node prefix. + _prefix: URI + + # extractor -> reader mapping + _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] + + def __init__( + self, + prefix: URI, + ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] + ): + # store core members + self._prefix = prefix + self._ext2rdr = ext2rdr + # compile schema from all extractors + self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr) + + def __str__(self) -> str: + return typename(self) + + def __repr__(self) -> str: + return f'{typename(self)}(...)' + + def __hash__(self) -> int: + return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self.schema == other.schema \ + and self._prefix == other._prefix \ + and self._ext2rdr == other._ext2rdr + + def __call__( + self, + path: URI, + predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *predicates*.""" + # get predicates + predicates = set(predicates) if predicates is not None else set(self.schema.predicates()) + + # get extractors + extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)} + + # corner-case short-cut + if len(extractors) == 0: + return + + # get readers -> extractors mapping + rdr2ext = defaultdict(set) + for ext in extractors: + rdr = self._ext2rdr[ext] + rdr2ext[rdr].add(ext) + + # create subject for file + uuid = _uuid.UCID.from_path(path) + subject = Node(ns.bsfs.Entity, self._prefix + uuid) + + # extract information + for rdr, extrs in rdr2ext.items(): + try: + # get content + content = rdr(path) if rdr is not None else None + + # apply extractors on this content + for ext in extrs: + try: + # get predicate/value tuples + for node, pred, value in ext.extract(subject, content, predicates): + yield node, pred, value + + except base.errors.ExtractorError as err: + # critical extractor failure. + logger.error('%s failed to extract triples from content: %s', ext, err) + + except base.errors.ReaderError as err: + # failed to read any content. skip. + logger.error('%s failed to read content: %s', rdr, err) + + +## EOF ## -- cgit v1.2.3 From 559e643bb1fa39feefd2eb73847ad9420daf1deb Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Wed, 14 Dec 2022 06:10:25 +0100 Subject: bsie extraction and info apps --- bsie/tools/pipeline.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index 8e1c992..da422c0 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -70,6 +70,10 @@ class Pipeline(): and self._prefix == other._prefix \ and self._ext2rdr == other._ext2rdr + def predicates(self) -> typing.Iterator[_schema.Predicate]: + """Return the predicates that are extracted from a file.""" + return iter({pred for ext in self._ext2rdr for pred in ext.predicates()}) + def __call__( self, path: URI, -- cgit v1.2.3 From 3b7fee369924eb7704709edeb8c17fff9c020dfb Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Thu, 15 Dec 2022 17:06:09 +0100 Subject: import fixes --- bsie/tools/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index da422c0..7fdd935 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -11,9 +11,9 @@ import typing # bsie imports from bsie import base -from bsie.utils import ns from bsie.utils.node import Node from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename +from bsie.utils import bsfs, node, ns # exports __all__: typing.Sequence[str] = ( @@ -56,10 +56,10 @@ class Pipeline(): self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr) def __str__(self) -> str: - return typename(self) + return bsfs.typename(self) def __repr__(self) -> str: - return f'{typename(self)}(...)' + return f'{bsfs.typename(self)}(...)' def __hash__(self) -> int: return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) -- cgit v1.2.3 From 8e6d27ea75d2c8d68f6dd8b3d529aaa278f291cc Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Thu, 15 Dec 2022 17:12:56 +0100 Subject: file node class in default schema --- bsie/tools/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index 7fdd935..3d08993 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -97,8 +97,8 @@ class Pipeline(): rdr2ext[rdr].add(ext) # create subject for file - uuid = _uuid.UCID.from_path(path) - subject = Node(ns.bsfs.Entity, self._prefix + uuid) + uuid = bsfs.uuid.UCID.from_path(path) + subject = node.Node(ns.bsfs.File, self._prefix + 'file#' + uuid) # extract information for rdr, extrs in rdr2ext.items(): -- cgit v1.2.3 From 5d9526783ad8432c7d6dfe18c0e9f2b37950b470 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Thu, 15 Dec 2022 17:16:25 +0100 Subject: Pipeline.prefix as Namespace instead of URI --- bsie/tools/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index 3d08993..834bd99 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -39,14 +39,14 @@ class Pipeline(): schema: _schema.Schema # node prefix. - _prefix: URI + _prefix: bsfs.Namespace # extractor -> reader mapping _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] def __init__( self, - prefix: URI, + prefix: bsfs.Namespace, ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] ): # store core members -- cgit v1.2.3 From 37510d134458bf954ca2da6d40be0d6c76661e8c Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Thu, 15 Dec 2022 17:19:21 +0100 Subject: bsie/pipeline interface revision: * predicates -> principals * schema as property * principals as property * information hiding * full subschema instead of only predicates --- bsie/tools/pipeline.py | 52 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 18 deletions(-) (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index 834bd99..52ce526 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -11,8 +11,6 @@ import typing # bsie imports from bsie import base -from bsie.utils.node import Node -from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename from bsie.utils import bsfs, node, ns # exports @@ -36,7 +34,7 @@ class Pipeline(): """ # combined extractor schemas. - schema: _schema.Schema + _schema: bsfs.schema.Schema # node prefix. _prefix: bsfs.Namespace @@ -53,7 +51,7 @@ class Pipeline(): self._prefix = prefix self._ext2rdr = ext2rdr # compile schema from all extractors - self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr) + self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) def __str__(self) -> str: return bsfs.typename(self) @@ -62,29 +60,47 @@ class Pipeline(): return f'{bsfs.typename(self)}(...)' def __hash__(self) -> int: - return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ - and self.schema == other.schema \ + and self._schema == other._schema \ and self._prefix == other._prefix \ and self._ext2rdr == other._ext2rdr - def predicates(self) -> typing.Iterator[_schema.Predicate]: - """Return the predicates that are extracted from a file.""" - return iter({pred for ext in self._ext2rdr for pred in ext.predicates()}) + @property + def schema(self) -> bsfs.schema.Schema: + """Return the pipeline's schema (combined from all extractors).""" + return self._schema + + @property + def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: + """Return the principal predicates that can be extracted.""" + return iter({pred for ext in self._ext2rdr for pred in ext.principals}) + + def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: + """Return the subset of the schema that supports the given *principals*.""" + # materialize principals + principals = set(principals) + # collect and combine schemas from extractors + return bsfs.schema.Schema.Union({ + ext.schema + for ext + in self._ext2rdr + if not set(ext.principals).isdisjoint(principals) + }) def __call__( self, - path: URI, - predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None, - ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]: - """Extract triples from the file at *path*. Optionally, limit triples to *predicates*.""" - # get predicates - predicates = set(predicates) if predicates is not None else set(self.schema.predicates()) + path: bsfs.URI, + principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" + # get principals + principals = set(principals) if principals is not None else set(self.schema.predicates()) # get extractors - extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)} + extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} # corner-case short-cut if len(extractors) == 0: @@ -110,8 +126,8 @@ class Pipeline(): for ext in extrs: try: # get predicate/value tuples - for node, pred, value in ext.extract(subject, content, predicates): - yield node, pred, value + for subject, pred, value in ext.extract(subject, content, principals): + yield subject, pred, value except base.errors.ExtractorError as err: # critical extractor failure. -- cgit v1.2.3 From 057e09d6537bf5c39815661a75819081e3e5fda7 Mon Sep 17 00:00:00 2001 From: Matthias Baumgartner Date: Sun, 18 Dec 2022 13:37:59 +0100 Subject: adaptions to updates in bsfs --- bsie/tools/pipeline.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'bsie/tools/pipeline.py') diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index 52ce526..20e8ddf 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -18,6 +18,9 @@ __all__: typing.Sequence[str] = ( 'Pipeline', ) +# constants +FILE_PREFIX = 'file#' + ## code ## logger = logging.getLogger(__name__) @@ -48,7 +51,7 @@ class Pipeline(): ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] ): # store core members - self._prefix = prefix + self._prefix = prefix + FILE_PREFIX self._ext2rdr = ext2rdr # compile schema from all extractors self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) @@ -114,7 +117,7 @@ class Pipeline(): # create subject for file uuid = bsfs.uuid.UCID.from_path(path) - subject = node.Node(ns.bsfs.File, self._prefix + 'file#' + uuid) + subject = node.Node(ns.bsfs.File, self._prefix[uuid]) # extract information for rdr, extrs in rdr2ext.items(): -- cgit v1.2.3