diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
commit | a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch) | |
tree | fb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/tools | |
parent | 7582c280ad5324a2f0427999911c7e7abc14a6ab (diff) | |
parent | af81318ae9311fd0b0e16949cef3cfaf7996970b (diff) | |
download | bsie-release.tar.gz bsie-release.tar.bz2 bsie-release.zip |
Diffstat (limited to 'bsie/tools')
-rw-r--r-- | bsie/tools/__init__.py | 20 | ||||
-rw-r--r-- | bsie/tools/builder.py | 226 | ||||
-rw-r--r-- | bsie/tools/pipeline.py | 144 |
3 files changed, 0 insertions, 390 deletions
diff --git a/bsie/tools/__init__.py b/bsie/tools/__init__.py deleted file mode 100644 index 803c321..0000000 --- a/bsie/tools/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -""" - -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports -import typing - -# inner-module imports -from . import builder -from .pipeline import Pipeline - -# exports -__all__: typing.Sequence[str] = ( - 'builder', - 'Pipeline', - ) - -## EOF ## diff --git a/bsie/tools/builder.py b/bsie/tools/builder.py deleted file mode 100644 index 190d9bf..0000000 --- a/bsie/tools/builder.py +++ /dev/null @@ -1,226 +0,0 @@ -""" - -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports -import importlib -import logging -import typing - -# bsie imports -from bsie import base -from bsie.base import errors -from bsie.utils import bsfs - -# inner-module imports -from . import pipeline - -# exports -__all__: typing.Sequence[str] = ( - 'ExtractorBuilder', - 'PipelineBuilder', - 'ReaderBuilder', - ) - - -## code ## - -logger = logging.getLogger(__name__) - -def _safe_load(module_name: str, class_name: str): - """Get a class from a module. Raise BuilderError if anything goes wrong.""" - try: - # load the module - module = importlib.import_module(module_name) - except Exception as err: - # cannot import module - raise errors.LoaderError(f'cannot load module {module_name}') from err - - try: - # get the class from the module - cls = getattr(module, class_name) - except Exception as err: - # cannot find the class - raise errors.LoaderError(f'cannot load class {class_name} from module {module_name}') from err - - return cls - - -def _unpack_name(name): - """Split a name into its module and class component (dot-separated).""" - if not isinstance(name, str): - raise TypeError(name) - if '.' not in name: - raise ValueError('name must be a qualified class name.') - module_name, class_name = name[:name.rfind('.')], name[name.rfind('.')+1:] - if module_name == '': - raise ValueError('name must be a qualified class name.') - return module_name, class_name - - -class ReaderBuilder(): - """Build `bsie.base.Reader` instances. - - Readers are defined via their qualified class name - (e.g., bsie.reader.path.Path) and optional keyword - arguments that are passed to the constructor via - the *kwargs* argument (name as key, kwargs as value). - The ReaderBuilder keeps a cache of previously built - reader instances, as they are anyway built with - identical keyword arguments. - - """ - - # keyword arguments - _kwargs: typing.Dict[str, typing.Dict[str, typing.Any]] - - # cached readers - _cache: typing.Dict[str, base.Reader] - - def __init__(self, kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]): - self._kwargs = kwargs - self._cache = {} - - def build(self, name: str) -> base.Reader: - """Return an instance for the qualified class name.""" - # return cached instance - if name in self._cache: - return self._cache[name] - - # check name and get module/class components - module_name, class_name = _unpack_name(name) - - # import reader class - cls = _safe_load(module_name, class_name) - - # get kwargs - kwargs = self._kwargs.get(name, {}) - if not isinstance(kwargs, dict): - raise TypeError(f'expected a kwargs dict, found {bsfs.typename(kwargs)}') - - try: # build, cache, and return instance - obj = cls(**kwargs) - # cache instance - self._cache[name] = obj - # return instance - return obj - - except Exception as err: - raise errors.BuilderError(f'failed to build reader {name} due to {bsfs.typename(err)}: {err}') from err - - -class ExtractorBuilder(): - """Build `bsie.base.Extractor instances. - - It is permissible to build multiple instances of the same extractor - (typically with different arguments), hence the ExtractorBuilder - receives a list of build specifications. Each specification is - a dict with a single key (extractor's qualified name) and a dict - to be used as keyword arguments. - Example: [{'bsie.extractor.generic.path.Path': {}}, ] - - """ - - # build specifications - _specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]] - - def __init__(self, specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]): - self._specs = specs - - def __iter__(self) -> typing.Iterator[int]: - """Iterate over extractor specifications.""" - return iter(range(len(self._specs))) - - def build(self, index: int) -> base.Extractor: - """Return an instance of the n'th extractor (n=*index*).""" - # get build instructions - specs = self._specs[index] - - # check specs structure. expecting[{name: {kwargs}}] - if not isinstance(specs, dict): - raise TypeError(f'expected a dict, found {bsfs.typename(specs)}') - if len(specs) != 1: - raise TypeError(f'expected a dict of length one, found {len(specs)}') - - # get name and args from specs - name = next(iter(specs.keys())) - kwargs = specs[name] - - # check kwargs structure - if not isinstance(kwargs, dict): - raise TypeError(f'expected a dict, found {bsfs.typename(kwargs)}') - - # check name and get module/class components - module_name, class_name = _unpack_name(name) - - # import extractor class - cls = _safe_load(module_name, class_name) - - try: # build and return instance - return cls(**kwargs) - - except Exception as err: - raise errors.BuilderError(f'failed to build extractor {name} due to {bsfs.typename(err)}: {err}') from err - - -class PipelineBuilder(): - """Build `bsie.tools.pipeline.Pipeline` instances.""" - - # Prefix to be used in the Pipeline. - prefix: bsfs.Namespace - - # builder for Readers. - rbuild: ReaderBuilder - - # builder for Extractors. - ebuild: ExtractorBuilder - - def __init__( - self, - prefix: bsfs.Namespace, - reader_builder: ReaderBuilder, - extractor_builder: ExtractorBuilder, - ): - self.prefix = prefix - self.rbuild = reader_builder - self.ebuild = extractor_builder - - def build(self) -> pipeline.Pipeline: - """Return a Pipeline instance.""" - ext2rdr = {} - - for eidx in self.ebuild: - # build extractor - try: - ext = self.ebuild.build(eidx) - - except errors.LoaderError as err: # failed to load extractor; skip - logger.error('failed to load extractor: %s', err) - continue - - except errors.BuilderError as err: # failed to build instance; skip - logger.error(str(err)) - continue - - try: - # get reader required by extractor - if ext.CONTENT_READER is not None: - rdr = self.rbuild.build(ext.CONTENT_READER) - else: - rdr = None - # store extractor - ext2rdr[ext] = rdr - - except errors.LoaderError as err: # failed to load reader - logger.error('failed to load reader: %s', err) - - except errors.BuilderError as err: # failed to build reader - logger.error(str(err)) - - return pipeline.Pipeline(self.prefix, ext2rdr) - - - -## EOF ## diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py deleted file mode 100644 index 20e8ddf..0000000 --- a/bsie/tools/pipeline.py +++ /dev/null @@ -1,144 +0,0 @@ -""" - -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports -from collections import defaultdict -import logging -import typing - -# bsie imports -from bsie import base -from bsie.utils import bsfs, node, ns - -# exports -__all__: typing.Sequence[str] = ( - 'Pipeline', - ) - -# constants -FILE_PREFIX = 'file#' - -## code ## - -logger = logging.getLogger(__name__) - -class Pipeline(): - """Extraction pipeline to generate triples from files. - - The Pipeline binds readers and extractors, and performs - the necessary operations to produce triples from a file. - It takes a best-effort approach to extract as many triples - as possible. Errors during the extraction are passed over - and reported to the log. - - """ - - # combined extractor schemas. - _schema: bsfs.schema.Schema - - # node prefix. - _prefix: bsfs.Namespace - - # extractor -> reader mapping - _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] - - def __init__( - self, - prefix: bsfs.Namespace, - ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]] - ): - # store core members - self._prefix = prefix + FILE_PREFIX - self._ext2rdr = ext2rdr - # compile schema from all extractors - self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) - - def __str__(self) -> str: - return bsfs.typename(self) - - def __repr__(self) -> str: - return f'{bsfs.typename(self)}(...)' - - def __hash__(self) -> int: - return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) - - def __eq__(self, other: typing.Any) -> bool: - return isinstance(other, type(self)) \ - and self._schema == other._schema \ - and self._prefix == other._prefix \ - and self._ext2rdr == other._ext2rdr - - @property - def schema(self) -> bsfs.schema.Schema: - """Return the pipeline's schema (combined from all extractors).""" - return self._schema - - @property - def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: - """Return the principal predicates that can be extracted.""" - return iter({pred for ext in self._ext2rdr for pred in ext.principals}) - - def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: - """Return the subset of the schema that supports the given *principals*.""" - # materialize principals - principals = set(principals) - # collect and combine schemas from extractors - return bsfs.schema.Schema.Union({ - ext.schema - for ext - in self._ext2rdr - if not set(ext.principals).isdisjoint(principals) - }) - - def __call__( - self, - path: bsfs.URI, - principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, - ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: - """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" - # get principals - principals = set(principals) if principals is not None else set(self.schema.predicates()) - - # get extractors - extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} - - # corner-case short-cut - if len(extractors) == 0: - return - - # get readers -> extractors mapping - rdr2ext = defaultdict(set) - for ext in extractors: - rdr = self._ext2rdr[ext] - rdr2ext[rdr].add(ext) - - # create subject for file - uuid = bsfs.uuid.UCID.from_path(path) - subject = node.Node(ns.bsfs.File, self._prefix[uuid]) - - # extract information - for rdr, extrs in rdr2ext.items(): - try: - # get content - content = rdr(path) if rdr is not None else None - - # apply extractors on this content - for ext in extrs: - try: - # get predicate/value tuples - for subject, pred, value in ext.extract(subject, content, principals): - yield subject, pred, value - - except base.errors.ExtractorError as err: - # critical extractor failure. - logger.error('%s failed to extract triples from content: %s', ext, err) - - except base.errors.ReaderError as err: - # failed to read any content. skip. - logger.error('%s failed to read content: %s', rdr, err) - - -## EOF ## |