1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
"""
Part of the bsie module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
import importlib
import logging
import typing
# bsie imports
from bsie import base
from bsie.base import errors
from bsie.utils import bsfs
# inner-module imports
from . import pipeline
# exports
__all__: typing.Sequence[str] = (
'ExtractorBuilder',
'PipelineBuilder',
'ReaderBuilder',
)
## code ##
logger = logging.getLogger(__name__)
def _safe_load(module_name: str, class_name: str):
"""Get a class from a module. Raise BuilderError if anything goes wrong."""
try:
# load the module
module = importlib.import_module(module_name)
except Exception as err:
# cannot import module
raise errors.LoaderError(f'cannot load module {module_name}') from err
try:
# get the class from the module
cls = getattr(module, class_name)
except Exception as err:
# cannot find the class
raise errors.LoaderError(f'cannot load class {class_name} from module {module_name}') from err
return cls
def _unpack_name(name):
"""Split a name into its module and class component (dot-separated)."""
if not isinstance(name, str):
raise TypeError(name)
if '.' not in name:
raise ValueError('name must be a qualified class name.')
module_name, class_name = name[:name.rfind('.')], name[name.rfind('.')+1:]
if module_name == '':
raise ValueError('name must be a qualified class name.')
return module_name, class_name
class ReaderBuilder():
"""Build `bsie.base.Reader` instances.
Readers are defined via their qualified class name
(e.g., bsie.reader.path.Path) and optional keyword
arguments that are passed to the constructor via
the *kwargs* argument (name as key, kwargs as value).
The ReaderBuilder keeps a cache of previously built
reader instances, as they are anyway built with
identical keyword arguments.
"""
# keyword arguments
_kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]
# cached readers
_cache: typing.Dict[str, base.Reader]
def __init__(self, kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]):
self._kwargs = kwargs
self._cache = {}
def build(self, name: str) -> base.Reader:
"""Return an instance for the qualified class name."""
# return cached instance
if name in self._cache:
return self._cache[name]
# check name and get module/class components
module_name, class_name = _unpack_name(name)
# import reader class
cls = _safe_load(module_name, class_name)
# get kwargs
kwargs = self._kwargs.get(name, {})
if not isinstance(kwargs, dict):
raise TypeError(f'expected a kwargs dict, found {bsfs.typename(kwargs)}')
try: # build, cache, and return instance
obj = cls(**kwargs)
# cache instance
self._cache[name] = obj
# return instance
return obj
except Exception as err:
raise errors.BuilderError(f'failed to build reader {name} due to {bsfs.typename(err)}: {err}') from err
class ExtractorBuilder():
"""Build `bsie.base.Extractor instances.
It is permissible to build multiple instances of the same extractor
(typically with different arguments), hence the ExtractorBuilder
receives a list of build specifications. Each specification is
a dict with a single key (extractor's qualified name) and a dict
to be used as keyword arguments.
Example: [{'bsie.extractor.generic.path.Path': {}}, ]
"""
# build specifications
_specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]
def __init__(self, specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]):
self._specs = specs
def __iter__(self) -> typing.Iterator[int]:
"""Iterate over extractor specifications."""
return iter(range(len(self._specs)))
def build(self, index: int) -> base.Extractor:
"""Return an instance of the n'th extractor (n=*index*)."""
# get build instructions
specs = self._specs[index]
# check specs structure. expecting[{name: {kwargs}}]
if not isinstance(specs, dict):
raise TypeError(f'expected a dict, found {bsfs.typename(specs)}')
if len(specs) != 1:
raise TypeError(f'expected a dict of length one, found {len(specs)}')
# get name and args from specs
name = next(iter(specs.keys()))
kwargs = specs[name]
# check kwargs structure
if not isinstance(kwargs, dict):
raise TypeError(f'expected a dict, found {bsfs.typename(kwargs)}')
# check name and get module/class components
module_name, class_name = _unpack_name(name)
# import extractor class
cls = _safe_load(module_name, class_name)
try: # build and return instance
return cls(**kwargs)
except Exception as err:
raise errors.BuilderError(f'failed to build extractor {name} due to {bsfs.typename(err)}: {err}') from err
class PipelineBuilder():
"""Build `bsie.tools.pipeline.Pipeline` instances."""
# Prefix to be used in the Pipeline.
prefix: bsfs.Namespace
# builder for Readers.
rbuild: ReaderBuilder
# builder for Extractors.
ebuild: ExtractorBuilder
def __init__(
self,
prefix: bsfs.Namespace,
reader_builder: ReaderBuilder,
extractor_builder: ExtractorBuilder,
):
self.prefix = prefix
self.rbuild = reader_builder
self.ebuild = extractor_builder
def build(self) -> pipeline.Pipeline:
"""Return a Pipeline instance."""
ext2rdr = {}
for eidx in self.ebuild:
# build extractor
try:
ext = self.ebuild.build(eidx)
except errors.LoaderError as err: # failed to load extractor; skip
logger.error('failed to load extractor: %s', err)
continue
except errors.BuilderError as err: # failed to build instance; skip
logger.error(str(err))
continue
try:
# get reader required by extractor
if ext.CONTENT_READER is not None:
rdr = self.rbuild.build(ext.CONTENT_READER)
else:
rdr = None
# store extractor
ext2rdr[ext] = rdr
except errors.LoaderError as err: # failed to load reader
logger.error('failed to load reader: %s', err)
except errors.BuilderError as err: # failed to build reader
logger.error(str(err))
return pipeline.Pipeline(self.prefix, ext2rdr)
## EOF ##
|