aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/tools
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
commita35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch)
treefb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/tools
parent7582c280ad5324a2f0427999911c7e7abc14a6ab (diff)
parentaf81318ae9311fd0b0e16949cef3cfaf7996970b (diff)
downloadbsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.tar.gz
bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.tar.bz2
bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.zip
Merge branch 'develop'HEADv0.23.03releasemain
Diffstat (limited to 'bsie/tools')
-rw-r--r--bsie/tools/__init__.py20
-rw-r--r--bsie/tools/builder.py226
-rw-r--r--bsie/tools/pipeline.py144
3 files changed, 0 insertions, 390 deletions
diff --git a/bsie/tools/__init__.py b/bsie/tools/__init__.py
deleted file mode 100644
index 803c321..0000000
--- a/bsie/tools/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-"""
-
-Part of the bsie module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
-# imports
-import typing
-
-# inner-module imports
-from . import builder
-from .pipeline import Pipeline
-
-# exports
-__all__: typing.Sequence[str] = (
- 'builder',
- 'Pipeline',
- )
-
-## EOF ##
diff --git a/bsie/tools/builder.py b/bsie/tools/builder.py
deleted file mode 100644
index 190d9bf..0000000
--- a/bsie/tools/builder.py
+++ /dev/null
@@ -1,226 +0,0 @@
-"""
-
-Part of the bsie module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
-# imports
-import importlib
-import logging
-import typing
-
-# bsie imports
-from bsie import base
-from bsie.base import errors
-from bsie.utils import bsfs
-
-# inner-module imports
-from . import pipeline
-
-# exports
-__all__: typing.Sequence[str] = (
- 'ExtractorBuilder',
- 'PipelineBuilder',
- 'ReaderBuilder',
- )
-
-
-## code ##
-
-logger = logging.getLogger(__name__)
-
-def _safe_load(module_name: str, class_name: str):
- """Get a class from a module. Raise BuilderError if anything goes wrong."""
- try:
- # load the module
- module = importlib.import_module(module_name)
- except Exception as err:
- # cannot import module
- raise errors.LoaderError(f'cannot load module {module_name}') from err
-
- try:
- # get the class from the module
- cls = getattr(module, class_name)
- except Exception as err:
- # cannot find the class
- raise errors.LoaderError(f'cannot load class {class_name} from module {module_name}') from err
-
- return cls
-
-
-def _unpack_name(name):
- """Split a name into its module and class component (dot-separated)."""
- if not isinstance(name, str):
- raise TypeError(name)
- if '.' not in name:
- raise ValueError('name must be a qualified class name.')
- module_name, class_name = name[:name.rfind('.')], name[name.rfind('.')+1:]
- if module_name == '':
- raise ValueError('name must be a qualified class name.')
- return module_name, class_name
-
-
-class ReaderBuilder():
- """Build `bsie.base.Reader` instances.
-
- Readers are defined via their qualified class name
- (e.g., bsie.reader.path.Path) and optional keyword
- arguments that are passed to the constructor via
- the *kwargs* argument (name as key, kwargs as value).
- The ReaderBuilder keeps a cache of previously built
- reader instances, as they are anyway built with
- identical keyword arguments.
-
- """
-
- # keyword arguments
- _kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]
-
- # cached readers
- _cache: typing.Dict[str, base.Reader]
-
- def __init__(self, kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]):
- self._kwargs = kwargs
- self._cache = {}
-
- def build(self, name: str) -> base.Reader:
- """Return an instance for the qualified class name."""
- # return cached instance
- if name in self._cache:
- return self._cache[name]
-
- # check name and get module/class components
- module_name, class_name = _unpack_name(name)
-
- # import reader class
- cls = _safe_load(module_name, class_name)
-
- # get kwargs
- kwargs = self._kwargs.get(name, {})
- if not isinstance(kwargs, dict):
- raise TypeError(f'expected a kwargs dict, found {bsfs.typename(kwargs)}')
-
- try: # build, cache, and return instance
- obj = cls(**kwargs)
- # cache instance
- self._cache[name] = obj
- # return instance
- return obj
-
- except Exception as err:
- raise errors.BuilderError(f'failed to build reader {name} due to {bsfs.typename(err)}: {err}') from err
-
-
-class ExtractorBuilder():
- """Build `bsie.base.Extractor instances.
-
- It is permissible to build multiple instances of the same extractor
- (typically with different arguments), hence the ExtractorBuilder
- receives a list of build specifications. Each specification is
- a dict with a single key (extractor's qualified name) and a dict
- to be used as keyword arguments.
- Example: [{'bsie.extractor.generic.path.Path': {}}, ]
-
- """
-
- # build specifications
- _specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]
-
- def __init__(self, specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]):
- self._specs = specs
-
- def __iter__(self) -> typing.Iterator[int]:
- """Iterate over extractor specifications."""
- return iter(range(len(self._specs)))
-
- def build(self, index: int) -> base.Extractor:
- """Return an instance of the n'th extractor (n=*index*)."""
- # get build instructions
- specs = self._specs[index]
-
- # check specs structure. expecting[{name: {kwargs}}]
- if not isinstance(specs, dict):
- raise TypeError(f'expected a dict, found {bsfs.typename(specs)}')
- if len(specs) != 1:
- raise TypeError(f'expected a dict of length one, found {len(specs)}')
-
- # get name and args from specs
- name = next(iter(specs.keys()))
- kwargs = specs[name]
-
- # check kwargs structure
- if not isinstance(kwargs, dict):
- raise TypeError(f'expected a dict, found {bsfs.typename(kwargs)}')
-
- # check name and get module/class components
- module_name, class_name = _unpack_name(name)
-
- # import extractor class
- cls = _safe_load(module_name, class_name)
-
- try: # build and return instance
- return cls(**kwargs)
-
- except Exception as err:
- raise errors.BuilderError(f'failed to build extractor {name} due to {bsfs.typename(err)}: {err}') from err
-
-
-class PipelineBuilder():
- """Build `bsie.tools.pipeline.Pipeline` instances."""
-
- # Prefix to be used in the Pipeline.
- prefix: bsfs.Namespace
-
- # builder for Readers.
- rbuild: ReaderBuilder
-
- # builder for Extractors.
- ebuild: ExtractorBuilder
-
- def __init__(
- self,
- prefix: bsfs.Namespace,
- reader_builder: ReaderBuilder,
- extractor_builder: ExtractorBuilder,
- ):
- self.prefix = prefix
- self.rbuild = reader_builder
- self.ebuild = extractor_builder
-
- def build(self) -> pipeline.Pipeline:
- """Return a Pipeline instance."""
- ext2rdr = {}
-
- for eidx in self.ebuild:
- # build extractor
- try:
- ext = self.ebuild.build(eidx)
-
- except errors.LoaderError as err: # failed to load extractor; skip
- logger.error('failed to load extractor: %s', err)
- continue
-
- except errors.BuilderError as err: # failed to build instance; skip
- logger.error(str(err))
- continue
-
- try:
- # get reader required by extractor
- if ext.CONTENT_READER is not None:
- rdr = self.rbuild.build(ext.CONTENT_READER)
- else:
- rdr = None
- # store extractor
- ext2rdr[ext] = rdr
-
- except errors.LoaderError as err: # failed to load reader
- logger.error('failed to load reader: %s', err)
-
- except errors.BuilderError as err: # failed to build reader
- logger.error(str(err))
-
- return pipeline.Pipeline(self.prefix, ext2rdr)
-
-
-
-## EOF ##
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py
deleted file mode 100644
index 20e8ddf..0000000
--- a/bsie/tools/pipeline.py
+++ /dev/null
@@ -1,144 +0,0 @@
-"""
-
-Part of the bsie module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
-# imports
-from collections import defaultdict
-import logging
-import typing
-
-# bsie imports
-from bsie import base
-from bsie.utils import bsfs, node, ns
-
-# exports
-__all__: typing.Sequence[str] = (
- 'Pipeline',
- )
-
-# constants
-FILE_PREFIX = 'file#'
-
-## code ##
-
-logger = logging.getLogger(__name__)
-
-class Pipeline():
- """Extraction pipeline to generate triples from files.
-
- The Pipeline binds readers and extractors, and performs
- the necessary operations to produce triples from a file.
- It takes a best-effort approach to extract as many triples
- as possible. Errors during the extraction are passed over
- and reported to the log.
-
- """
-
- # combined extractor schemas.
- _schema: bsfs.schema.Schema
-
- # node prefix.
- _prefix: bsfs.Namespace
-
- # extractor -> reader mapping
- _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
-
- def __init__(
- self,
- prefix: bsfs.Namespace,
- ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
- ):
- # store core members
- self._prefix = prefix + FILE_PREFIX
- self._ext2rdr = ext2rdr
- # compile schema from all extractors
- self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr)
-
- def __str__(self) -> str:
- return bsfs.typename(self)
-
- def __repr__(self) -> str:
- return f'{bsfs.typename(self)}(...)'
-
- def __hash__(self) -> int:
- return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
-
- def __eq__(self, other: typing.Any) -> bool:
- return isinstance(other, type(self)) \
- and self._schema == other._schema \
- and self._prefix == other._prefix \
- and self._ext2rdr == other._ext2rdr
-
- @property
- def schema(self) -> bsfs.schema.Schema:
- """Return the pipeline's schema (combined from all extractors)."""
- return self._schema
-
- @property
- def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
- """Return the principal predicates that can be extracted."""
- return iter({pred for ext in self._ext2rdr for pred in ext.principals})
-
- def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema:
- """Return the subset of the schema that supports the given *principals*."""
- # materialize principals
- principals = set(principals)
- # collect and combine schemas from extractors
- return bsfs.schema.Schema.Union({
- ext.schema
- for ext
- in self._ext2rdr
- if not set(ext.principals).isdisjoint(principals)
- })
-
- def __call__(
- self,
- path: bsfs.URI,
- principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None,
- ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
- """Extract triples from the file at *path*. Optionally, limit triples to *principals*."""
- # get principals
- principals = set(principals) if principals is not None else set(self.schema.predicates())
-
- # get extractors
- extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)}
-
- # corner-case short-cut
- if len(extractors) == 0:
- return
-
- # get readers -> extractors mapping
- rdr2ext = defaultdict(set)
- for ext in extractors:
- rdr = self._ext2rdr[ext]
- rdr2ext[rdr].add(ext)
-
- # create subject for file
- uuid = bsfs.uuid.UCID.from_path(path)
- subject = node.Node(ns.bsfs.File, self._prefix[uuid])
-
- # extract information
- for rdr, extrs in rdr2ext.items():
- try:
- # get content
- content = rdr(path) if rdr is not None else None
-
- # apply extractors on this content
- for ext in extrs:
- try:
- # get predicate/value tuples
- for subject, pred, value in ext.extract(subject, content, principals):
- yield subject, pred, value
-
- except base.errors.ExtractorError as err:
- # critical extractor failure.
- logger.error('%s failed to extract triples from content: %s', ext, err)
-
- except base.errors.ReaderError as err:
- # failed to read any content. skip.
- logger.error('%s failed to read content: %s', rdr, err)
-
-
-## EOF ##