""" Part of the bsie module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import importlib import logging import typing # bsie imports from bsie import base from bsie.base import errors from bsie.utils.bsfs import URI, typename from bsie.utils import bsfs # inner-module imports from . import pipeline # exports __all__: typing.Sequence[str] = ( 'ExtractorBuilder', 'PipelineBuilder', 'ReaderBuilder', ) ## code ## logger = logging.getLogger(__name__) def _safe_load(module_name: str, class_name: str): """Get a class from a module. Raise BuilderError if anything goes wrong.""" try: # load the module module = importlib.import_module(module_name) except Exception as err: # cannot import module raise errors.LoaderError(f'cannot load module {module_name}') from err try: # get the class from the module cls = getattr(module, class_name) except Exception as err: # cannot find the class raise errors.LoaderError(f'cannot load class {class_name} from module {module_name}') from err return cls def _unpack_name(name): """Split a name into its module and class component (dot-separated).""" if not isinstance(name, str): raise TypeError(name) if '.' not in name: raise ValueError('name must be a qualified class name.') module_name, class_name = name[:name.rfind('.')], name[name.rfind('.')+1:] if module_name == '': raise ValueError('name must be a qualified class name.') return module_name, class_name class ReaderBuilder(): """Build `bsie.base.Reader` instances. Readers are defined via their qualified class name (e.g., bsie.reader.path.Path) and optional keyword arguments that are passed to the constructor via the *kwargs* argument (name as key, kwargs as value). The ReaderBuilder keeps a cache of previously built reader instances, as they are anyway built with identical keyword arguments. """ # keyword arguments kwargs: typing.Dict[str, typing.Dict[str, typing.Any]] # cached readers cache: typing.Dict[str, base.reader.Reader] def __init__(self, kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]): self.kwargs = kwargs self.cache = {} def build(self, name: str) -> base.Reader: """Return an instance for the qualified class name.""" # return cached instance if name in self.cache: return self.cache[name] # check name and get module/class components module_name, class_name = _unpack_name(name) # import reader class cls = _safe_load(module_name, class_name) # get kwargs kwargs = self.kwargs.get(name, {}) if not isinstance(kwargs, dict): raise TypeError(f'expected a kwargs dict, found {bsfs.typename(kwargs)}') try: # build, cache, and return instance obj = cls(**kwargs) # cache instance self.cache[name] = obj # return instance return obj except Exception as err: raise errors.BuilderError(f'failed to build reader {name} due to {bsfs.typename(err)}: {err}') from err class ExtractorBuilder(): """Build `bsie.base.Extractor instances. It is permissible to build multiple instances of the same extractor (typically with different arguments), hence the ExtractorBuilder receives a list of build specifications. Each specification is a dict with a single key (extractor's qualified name) and a dict to be used as keyword arguments. Example: [{'bsie.extractor.generic.path.Path': {}}, ] """ # build specifications specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]] def __init__(self, specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]): self.specs = specs def __iter__(self) -> typing.Iterator[int]: """Iterate over extractor specifications.""" return iter(range(len(self.specs))) def build(self, index: int) -> base.Extractor: """Return an instance of the n'th extractor (n=*index*).""" # get build instructions specs = self.specs[index] # check specs structure. expecting[{name: {kwargs}}] if not isinstance(specs, dict): raise TypeError(f'expected a dict, found {bsfs.typename(specs)}') if len(specs) != 1: raise TypeError(f'expected a dict of length one, found {len(specs)}') # get name and args from specs name = next(iter(specs.keys())) kwargs = specs[name] # check kwargs structure if not isinstance(kwargs, dict): raise TypeError(f'expected a dict, found {bsfs.typename(kwargs)}') # check name and get module/class components module_name, class_name = _unpack_name(name) # import extractor class cls = _safe_load(module_name, class_name) try: # build and return instance return cls(**kwargs) except Exception as err: raise errors.BuilderError(f'failed to build extractor {name} due to {bsfs.typename(err)}: {err}') from err class PipelineBuilder(): """Build `bsie.tools.pipeline.Pipeline` instances.""" # Prefix to be used in the Pipeline. prefix: bsfs.Namespace # builder for Readers. rbuild: ReaderBuilder # builder for Extractors. ebuild: ExtractorBuilder def __init__( self, prefix: bsfs.Namespace, reader_builder: ReaderBuilder, extractor_builder: ExtractorBuilder, ): self.prefix = prefix self.rbuild = reader_builder self.ebuild = extractor_builder def build(self) -> pipeline.Pipeline: """Return a Pipeline instance.""" ext2rdr = {} for eidx in self.ebuild: # build extractor try: ext = self.ebuild.build(eidx) except errors.LoaderError as err: # failed to load extractor; skip logger.error('failed to load extractor: %s', err) continue except errors.BuilderError as err: # failed to build instance; skip logger.error(str(err)) continue try: # get reader required by extractor if ext.CONTENT_READER is not None: rdr = self.rbuild.build(ext.CONTENT_READER) else: rdr = None # store extractor ext2rdr[ext] = rdr except errors.LoaderError as err: # failed to load reader logger.error('failed to load reader: %s', err) except errors.BuilderError as err: # failed to build reader logger.error(str(err)) return pipeline.Pipeline(self.prefix, ext2rdr) ## EOF ##