aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2022-11-25 14:43:12 +0100
committerMatthias Baumgartner <dev@igsor.net>2022-11-25 14:43:12 +0100
commit3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7 (patch)
treef038ed8d4f04c63991939e13e61ae170de4e2c57
parentc9a1dea230054f5d6f40b7fd5e3930609c5f6416 (diff)
downloadbsie-3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7.tar.gz
bsie-3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7.tar.bz2
bsie-3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7.zip
builders and pipeline
-rw-r--r--bsie/base/errors.py8
-rw-r--r--bsie/tools/__init__.py20
-rw-r--r--bsie/tools/builder.py217
-rw-r--r--bsie/tools/pipeline.py121
-rw-r--r--bsie/utils/bsfs.py3
-rw-r--r--test/tools/__init__.py0
-rw-r--r--test/tools/test_builder.py247
-rw-r--r--test/tools/test_pipeline.py167
-rw-r--r--test/tools/testfile.t1
9 files changed, 783 insertions, 1 deletions
diff --git a/bsie/base/errors.py b/bsie/base/errors.py
index a86b7e8..760351f 100644
--- a/bsie/base/errors.py
+++ b/bsie/base/errors.py
@@ -9,7 +9,9 @@ import typing
# exports
__all__: typing.Sequence[str] = (
+ 'BuilderError',
'ExtractorError',
+ 'LoaderError',
'ReaderError',
)
@@ -19,6 +21,12 @@ __all__: typing.Sequence[str] = (
class _BSIEError(Exception):
"""Generic BSIE error."""
+class BuilderError(_BSIEError):
+ """The Builder failed to create an instance."""
+
+class LoaderError(BuilderError):
+ """Failed to load a module or class."""
+
class ExtractorError(_BSIEError):
"""The Extractor failed to process the given content."""
diff --git a/bsie/tools/__init__.py b/bsie/tools/__init__.py
new file mode 100644
index 0000000..8ca9620
--- /dev/null
+++ b/bsie/tools/__init__.py
@@ -0,0 +1,20 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# inner-module imports
+from . import builder
+from . import pipeline
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'builder',
+ 'pipeline',
+ )
+
+## EOF ##
diff --git a/bsie/tools/builder.py b/bsie/tools/builder.py
new file mode 100644
index 0000000..8f7a410
--- /dev/null
+++ b/bsie/tools/builder.py
@@ -0,0 +1,217 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import importlib
+import logging
+import typing
+
+# bsie imports
+from bsie import base
+from bsie.base import errors
+from bsie.utils.bsfs import URI, typename
+
+# inner-module imports
+from . import pipeline
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'ExtractorBuilder',
+ 'PipelineBuilder',
+ 'ReaderBuilder',
+ )
+
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+def _safe_load(module_name: str, class_name: str):
+ """Get a class from a module. Raise BuilderError if anything goes wrong."""
+ try:
+ # load the module
+ module = importlib.import_module(module_name)
+ except Exception as err:
+ # cannot import module
+ raise errors.LoaderError(f'cannot load module {module_name}') from err
+
+ try:
+ # get the class from the module
+ cls = getattr(module, class_name)
+ except Exception as err:
+ # cannot find the class
+ raise errors.LoaderError(f'cannot load class {class_name} from module {module_name}') from err
+
+ return cls
+
+
+def _unpack_name(name):
+ """Split a name into its module and class component (dot-separated)."""
+ if not isinstance(name, str):
+ raise TypeError(name)
+ if '.' not in name:
+ raise ValueError('name must be a qualified class name.')
+ module_name, class_name = name[:name.rfind('.')], name[name.rfind('.')+1:]
+ if module_name == '':
+ raise ValueError('name must be a qualified class name.')
+ return module_name, class_name
+
+
+class ReaderBuilder():
+ """Build `bsie.base.reader.Reader` instances.
+
+ Readers are defined via their qualified class name
+ (e.g., bsie.reader.path.Path) and optional keyword
+ arguments that are passed to the constructor via
+ the *kwargs* argument (name as key, kwargs as value).
+ The ReaderBuilder keeps a cache of previously built
+ reader instances, as they are anyway built with
+ identical keyword arguments.
+
+ """
+
+ # keyword arguments
+ kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]
+
+ # cached readers
+ cache: typing.Dict[str, base.reader.Reader]
+
+ def __init__(self, kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]):
+ self.kwargs = kwargs
+ self.cache = {}
+
+ def build(self, name: str) -> base.reader.Reader:
+ """Return an instance for the qualified class name."""
+ # return cached instance
+ if name in self.cache:
+ return self.cache[name]
+
+ # check name and get module/class components
+ module_name, class_name = _unpack_name(name)
+
+ # import reader class
+ cls = _safe_load(module_name, class_name)
+
+ # get kwargs
+ kwargs = self.kwargs.get(name, {})
+ if not isinstance(kwargs, dict):
+ raise TypeError(f'expected a kwargs dict, found {typename(kwargs)}')
+
+ try: # build, cache, and return instance
+ obj = cls(**kwargs)
+ # cache instance
+ self.cache[name] = obj
+ # return instance
+ return obj
+
+ except Exception as err:
+ raise errors.BuilderError(f'failed to build reader {name} due to {typename(err)}: {err}') from err
+
+
+class ExtractorBuilder():
+ """Build `bsie.base.extractor.Extractor instances.
+
+ It is permissible to build multiple instances of the same extractor
+ (typically with different arguments), hence the ExtractorBuilder
+ receives a list of build specifications. Each specification is
+ a dict with a single key (extractor's qualified name) and a dict
+ to be used as keyword arguments.
+ Example: [{'bsie.extractor.generic.path.Path': {}}, ]
+
+ """
+
+ # build specifications
+ specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]
+
+ def __init__(self, specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]):
+ self.specs = specs
+
+ def __iter__(self) -> typing.Iterator[int]:
+ """Iterate over extractor specifications."""
+ return iter(range(len(self.specs)))
+
+ def build(self, index: int) -> base.extractor.Extractor:
+ """Return an instance of the n'th extractor (n=*index*)."""
+ # get build instructions
+ specs = self.specs[index]
+
+ # check specs structure. expecting[{name: {kwargs}}]
+ if not isinstance(specs, dict):
+ raise TypeError(f'expected a dict, found {typename(specs)}')
+ if len(specs) != 1:
+ raise TypeError(f'expected a dict of length one, found {len(specs)}')
+
+ # get name and args from specs
+ name = next(iter(specs.keys()))
+ kwargs = specs[name]
+
+ # check kwargs structure
+ if not isinstance(kwargs, dict):
+ raise TypeError(f'expected a dict, found {typename(kwargs)}')
+
+ # check name and get module/class components
+ module_name, class_name = _unpack_name(name)
+
+ # import extractor class
+ cls = _safe_load(module_name, class_name)
+
+ try: # build and return instance
+ return cls(**kwargs)
+
+ except Exception as err:
+ raise errors.BuilderError(f'failed to build extractor {name} due to {typename(err)}: {err}') from err
+
+
+class PipelineBuilder():
+ """Build `bsie.tools.pipeline.Pipeline` instances."""
+
+ def __init__(
+ self,
+ prefix: URI,
+ reader_builder: ReaderBuilder,
+ extractor_builder: ExtractorBuilder,
+ ):
+ self.prefix = prefix
+ self.rbuild = reader_builder
+ self.ebuild = extractor_builder
+
+ def build(self) -> pipeline.Pipeline:
+ """Return a Pipeline instance."""
+ ext2rdr = {}
+
+ for eidx in self.ebuild:
+ # build extractor
+ try:
+ ext = self.ebuild.build(eidx)
+
+ except errors.LoaderError as err: # failed to load extractor; skip
+ logger.error('failed to load extractor: %s', err)
+ continue
+
+ except errors.BuilderError as err: # failed to build instance; skip
+ logger.error(str(err))
+ continue
+
+ try:
+ # get reader required by extractor
+ if ext.CONTENT_READER is not None:
+ rdr = self.rbuild.build(ext.CONTENT_READER)
+ else:
+ rdr = None
+ # store extractor
+ ext2rdr[ext] = rdr
+
+ except errors.LoaderError as err: # failed to load reader
+ logger.error('failed to load reader: %s', err)
+
+ except errors.BuilderError as err: # failed to build reader
+ logger.error(str(err))
+
+ return pipeline.Pipeline(self.prefix, ext2rdr)
+
+
+
+## EOF ##
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py
new file mode 100644
index 0000000..8e1c992
--- /dev/null
+++ b/bsie/tools/pipeline.py
@@ -0,0 +1,121 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+from collections import defaultdict
+import logging
+import typing
+
+# bsie imports
+from bsie import base
+from bsie.utils import ns
+from bsie.utils.node import Node
+from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Pipeline',
+ )
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+class Pipeline():
+ """Extraction pipeline to generate triples from files.
+
+ The Pipeline binds readers and extractors, and performs
+ the necessary operations to produce triples from a file.
+ It takes a best-effort approach to extract as many triples
+ as possible. Errors during the extraction are passed over
+ and reported to the log.
+
+ """
+
+ # combined extractor schemas.
+ schema: _schema.Schema
+
+ # node prefix.
+ _prefix: URI
+
+ # extractor -> reader mapping
+ _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
+
+ def __init__(
+ self,
+ prefix: URI,
+ ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
+ ):
+ # store core members
+ self._prefix = prefix
+ self._ext2rdr = ext2rdr
+ # compile schema from all extractors
+ self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr)
+
+ def __str__(self) -> str:
+ return typename(self)
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}(...)'
+
+ def __hash__(self) -> int:
+ return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self.schema == other.schema \
+ and self._prefix == other._prefix \
+ and self._ext2rdr == other._ext2rdr
+
+ def __call__(
+ self,
+ path: URI,
+ predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None,
+ ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
+ """Extract triples from the file at *path*. Optionally, limit triples to *predicates*."""
+ # get predicates
+ predicates = set(predicates) if predicates is not None else set(self.schema.predicates())
+
+ # get extractors
+ extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)}
+
+ # corner-case short-cut
+ if len(extractors) == 0:
+ return
+
+ # get readers -> extractors mapping
+ rdr2ext = defaultdict(set)
+ for ext in extractors:
+ rdr = self._ext2rdr[ext]
+ rdr2ext[rdr].add(ext)
+
+ # create subject for file
+ uuid = _uuid.UCID.from_path(path)
+ subject = Node(ns.bsfs.Entity, self._prefix + uuid)
+
+ # extract information
+ for rdr, extrs in rdr2ext.items():
+ try:
+ # get content
+ content = rdr(path) if rdr is not None else None
+
+ # apply extractors on this content
+ for ext in extrs:
+ try:
+ # get predicate/value tuples
+ for node, pred, value in ext.extract(subject, content, predicates):
+ yield node, pred, value
+
+ except base.errors.ExtractorError as err:
+ # critical extractor failure.
+ logger.error('%s failed to extract triples from content: %s', ext, err)
+
+ except base.errors.ReaderError as err:
+ # failed to read any content. skip.
+ logger.error('%s failed to read content: %s', rdr, err)
+
+
+## EOF ##
diff --git a/bsie/utils/bsfs.py b/bsie/utils/bsfs.py
index 01ec5d1..a4b7626 100644
--- a/bsie/utils/bsfs.py
+++ b/bsie/utils/bsfs.py
@@ -10,7 +10,7 @@ import typing
# bsfs imports
from bsfs import schema
from bsfs.namespace import Namespace
-from bsfs.utils import URI, typename
+from bsfs.utils import URI, typename, uuid
# exports
__all__: typing.Sequence[str] = (
@@ -18,6 +18,7 @@ __all__: typing.Sequence[str] = (
'URI',
'schema',
'typename',
+ 'uuid',
)
## EOF ##
diff --git a/test/tools/__init__.py b/test/tools/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/tools/__init__.py
diff --git a/test/tools/test_builder.py b/test/tools/test_builder.py
new file mode 100644
index 0000000..bef0e9d
--- /dev/null
+++ b/test/tools/test_builder.py
@@ -0,0 +1,247 @@
+"""
+
+Part of the bsie test suite.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import logging
+import unittest
+
+# bsie imports
+from bsie import base
+from bsie.base import errors
+from bsie.utils.bsfs import URI
+
+# objects to test
+from bsie.tools.builder import ExtractorBuilder
+from bsie.tools.builder import PipelineBuilder
+from bsie.tools.builder import ReaderBuilder
+from bsie.tools.builder import _safe_load
+from bsie.tools.builder import _unpack_name
+
+
+## code ##
+
+class TestUtils(unittest.TestCase):
+ def test_safe_load(self):
+ # invalid module
+ self.assertRaises(errors.LoaderError, _safe_load, 'dBGHMSAYOoKeKMpywDoKZQycENFPvN', 'foobar')
+ self.assertRaises(errors.LoaderError, _safe_load, 'dBGHMSAYOoKeKMpywDoKZQycENFPvN.bar', 'foobar')
+ # partially valid module
+ self.assertRaises(errors.LoaderError, _safe_load, 'os.foo', 'foobar')
+ # invalid class
+ self.assertRaises(errors.LoaderError, _safe_load, 'os.path', 'foo')
+ # valid module and class
+ cls = _safe_load('collections.abc', 'Container')
+ import collections.abc
+ self.assertEqual(cls, collections.abc.Container)
+
+ def test_unpack_name(self):
+ self.assertRaises(TypeError, _unpack_name, 123)
+ self.assertRaises(TypeError, _unpack_name, None)
+ self.assertRaises(ValueError, _unpack_name, '')
+ self.assertRaises(ValueError, _unpack_name, 'path')
+ self.assertRaises(ValueError, _unpack_name, '.Path')
+ self.assertEqual(_unpack_name('path.Path'), ('path', 'Path'))
+ self.assertEqual(_unpack_name('path.foo.bar.Path'), ('path.foo.bar', 'Path'))
+
+
+class TestReaderBuilder(unittest.TestCase):
+ def test_build(self):
+ builder = ReaderBuilder({'bsie.reader.path.Path': {}})
+ # build configured reader
+ cls = builder.build('bsie.reader.path.Path')
+ import bsie.reader.path
+ self.assertIsInstance(cls, bsie.reader.path.Path)
+ # build unconfigured reader
+ cls = builder.build('bsie.reader.stat.Stat')
+ import bsie.reader.stat
+ self.assertIsInstance(cls, bsie.reader.stat.Stat)
+ # re-build previous reader (test cache)
+ self.assertEqual(cls, builder.build('bsie.reader.stat.Stat'))
+ # test invalid
+ self.assertRaises(TypeError, builder.build, 123)
+ self.assertRaises(TypeError, builder.build, None)
+ self.assertRaises(ValueError, builder.build, '')
+ self.assertRaises(ValueError, builder.build, 'Path')
+ self.assertRaises(errors.BuilderError, builder.build, 'path.Path')
+ # invalid config
+ builder = ReaderBuilder({'bsie.reader.stat.Stat': dict(foo=123)})
+ self.assertRaises(errors.BuilderError, builder.build, 'bsie.reader.stat.Stat')
+ builder = ReaderBuilder({'bsie.reader.stat.Stat': 123})
+ self.assertRaises(TypeError, builder.build, 'bsie.reader.stat.Stat')
+ # no instructions
+ builder = ReaderBuilder({})
+ cls = builder.build('bsie.reader.stat.Stat')
+ self.assertIsInstance(cls, bsie.reader.stat.Stat)
+
+
+
+class TestExtractorBuilder(unittest.TestCase):
+ def test_iter(self):
+ # no specifications
+ self.assertListEqual(list(ExtractorBuilder([])), [])
+ # some specifications
+ builder = ExtractorBuilder([
+ {'bsie.extractor.generic.path.Path': {}},
+ {'bsie.extractor.generic.stat.Stat': {}},
+ {'bsie.extractor.generic.path.Path': {}},
+ ])
+ self.assertListEqual(list(builder), [0, 1, 2])
+
+ def test_build(self):
+ # simple and repeated extractors
+ builder = ExtractorBuilder([
+ {'bsie.extractor.generic.path.Path': {}},
+ {'bsie.extractor.generic.stat.Stat': {}},
+ {'bsie.extractor.generic.path.Path': {}},
+ ])
+ ext = [builder.build(0), builder.build(1), builder.build(2)]
+ import bsie.extractor.generic.path
+ import bsie.extractor.generic.stat
+ self.assertListEqual(ext, [
+ bsie.extractor.generic.path.Path(),
+ bsie.extractor.generic.stat.Stat(),
+ bsie.extractor.generic.path.Path(),
+ ])
+ # out-of-bounds raises KeyError
+ self.assertRaises(IndexError, builder.build, 3)
+
+ # building with args
+ builder = ExtractorBuilder([
+ {'bsie.extractor.generic.constant.Constant': {
+ 'schema': '''
+ bse:author rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:string ;
+ owl:maxCardinality "1"^^xsd:number .
+ bse:rating rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:integer ;
+ owl:maxCardinality "1"^^xsd:number .
+ ''',
+ 'tuples': [
+ ('http://bsfs.ai/schema/Entity#author', 'Me, myself, and I'),
+ ('http://bsfs.ai/schema/Entity#rating', 123),
+ ],
+ }}])
+ obj = builder.build(0)
+ import bsie.extractor.generic.constant
+ self.assertEqual(obj, bsie.extractor.generic.constant.Constant('''
+ bse:author rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:string ;
+ owl:maxCardinality "1"^^xsd:number .
+ bse:rating rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:integer ;
+ owl:maxCardinality "1"^^xsd:number .
+ ''', [
+ ('http://bsfs.ai/schema/Entity#author', 'Me, myself, and I'),
+ ('http://bsfs.ai/schema/Entity#rating', 123),
+ ]))
+
+ # building with invalid args
+ self.assertRaises(errors.BuilderError, ExtractorBuilder(
+ [{'bsie.extractor.generic.path.Path': {'foo': 123}}]).build, 0)
+ # non-dict build specification
+ self.assertRaises(TypeError, ExtractorBuilder(
+ [('bsie.extractor.generic.path.Path', {})]).build, 0)
+ # multiple keys per build specification
+ self.assertRaises(TypeError, ExtractorBuilder(
+ [{'bsie.extractor.generic.path.Path': {},
+ 'bsie.extractor.generic.stat.Stat': {}}]).build, 0)
+ # non-dict value for kwargs
+ self.assertRaises(TypeError, ExtractorBuilder(
+ [{'bsie.extractor.generic.path.Path': 123}]).build, 0)
+
+
+
+
+class TestPipelineBuilder(unittest.TestCase):
+ def test_build(self):
+ prefix = URI('http://example.com/local/file#')
+ c_schema = '''
+ bse:author rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:string ;
+ owl:maxCardinality "1"^^xsd:number .
+ '''
+ c_tuples = [('http://bsfs.ai/schema/Entity#author', 'Me, myself, and I')]
+ # prepare builders
+ rbuild = ReaderBuilder({})
+ ebuild = ExtractorBuilder([
+ {'bsie.extractor.generic.path.Path': {}},
+ {'bsie.extractor.generic.stat.Stat': {}},
+ {'bsie.extractor.generic.constant.Constant': dict(
+ schema=c_schema,
+ tuples=c_tuples,
+ )},
+ ])
+ # build pipeline
+ builder = PipelineBuilder(prefix, rbuild, ebuild)
+ pipeline = builder.build()
+ # delayed import
+ import bsie.reader.path
+ import bsie.reader.stat
+ import bsie.extractor.generic.path
+ import bsie.extractor.generic.stat
+ import bsie.extractor.generic.constant
+ # check pipeline
+ self.assertDictEqual(pipeline._ext2rdr, {
+ bsie.extractor.generic.path.Path(): bsie.reader.path.Path(),
+ bsie.extractor.generic.stat.Stat(): bsie.reader.stat.Stat(),
+ bsie.extractor.generic.constant.Constant(c_schema, c_tuples): None,
+ })
+
+ # fail to load extractor
+ ebuild_err = ExtractorBuilder([
+ {'bsie.extractor.generic.foo.Foo': {}},
+ {'bsie.extractor.generic.path.Path': {}},
+ ])
+ with self.assertLogs(logging.getLogger('bsie.tools.builder'), logging.ERROR):
+ pipeline = PipelineBuilder(prefix, rbuild, ebuild_err).build()
+ self.assertDictEqual(pipeline._ext2rdr, {
+ bsie.extractor.generic.path.Path(): bsie.reader.path.Path()})
+
+ # fail to build extractor
+ ebuild_err = ExtractorBuilder([
+ {'bsie.extractor.generic.path.Path': {'foo': 123}},
+ {'bsie.extractor.generic.path.Path': {}},
+ ])
+ with self.assertLogs(logging.getLogger('bsie.tools.builder'), logging.ERROR):
+ pipeline = PipelineBuilder(prefix, rbuild, ebuild_err).build()
+ self.assertDictEqual(pipeline._ext2rdr, {
+ bsie.extractor.generic.path.Path(): bsie.reader.path.Path()})
+
+ # fail to load reader
+ with self.assertLogs(logging.getLogger('bsie.tools.builder'), logging.ERROR):
+ # switch reader of an extractor
+ old_reader = bsie.extractor.generic.path.Path.CONTENT_READER
+ bsie.extractor.generic.path.Path.CONTENT_READER = 'bsie.reader.foo.Foo'
+ # build pipeline with invalid reader reference
+ pipeline = PipelineBuilder(prefix, rbuild, ebuild).build()
+ self.assertDictEqual(pipeline._ext2rdr, {
+ bsie.extractor.generic.stat.Stat(): bsie.reader.stat.Stat(),
+ bsie.extractor.generic.constant.Constant(c_schema, c_tuples): None,
+ })
+ # switch back
+ bsie.extractor.generic.path.Path.CONTENT_READER = old_reader
+
+ # fail to build reader
+ rbuild_err = ReaderBuilder({'bsie.reader.stat.Stat': dict(foo=123)})
+ with self.assertLogs(logging.getLogger('bsie.tools.builder'), logging.ERROR):
+ pipeline = PipelineBuilder(prefix, rbuild_err, ebuild).build()
+ self.assertDictEqual(pipeline._ext2rdr, {
+ bsie.extractor.generic.path.Path(): bsie.reader.path.Path(),
+ bsie.extractor.generic.constant.Constant(c_schema, c_tuples): None,
+ })
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##
diff --git a/test/tools/test_pipeline.py b/test/tools/test_pipeline.py
new file mode 100644
index 0000000..9888d2e
--- /dev/null
+++ b/test/tools/test_pipeline.py
@@ -0,0 +1,167 @@
+"""
+
+Part of the bsie test suite.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import logging
+import os
+import unittest
+
+# bsie imports
+from bsie.base import errors
+from bsie.utils import ns
+from bsie.utils.bsfs import URI
+from bsie.utils.node import Node
+import bsie.extractor.generic.constant
+import bsie.extractor.generic.path
+import bsie.extractor.generic.stat
+import bsie.reader.path
+import bsie.reader.stat
+
+# objects to test
+from bsie.tools.pipeline import Pipeline
+
+
+## code ##
+
+class TestPipeline(unittest.TestCase):
+ def setUp(self):
+ # constant A
+ csA = '''
+ bse:author rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:string ;
+ owl:maxCardinality "1"^^xsd:number .
+ '''
+ tupA = [('http://bsfs.ai/schema/Entity#author', 'Me, myself, and I')]
+ # constant B
+ csB = '''
+ bse:rating rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:integer ;
+ owl:maxCardinality "1"^^xsd:number .
+ '''
+ tupB = [('http://bsfs.ai/schema/Entity#rating', 123)]
+ # extractors/readers
+ self.ext2rdr = {
+ bsie.extractor.generic.path.Path(): bsie.reader.path.Path(),
+ bsie.extractor.generic.stat.Stat(): bsie.reader.stat.Stat(),
+ bsie.extractor.generic.constant.Constant(csA, tupA): None,
+ bsie.extractor.generic.constant.Constant(csB, tupB): None,
+ }
+ self.prefix = URI('http://example.com/local/file#')
+
+ def test_essentials(self):
+ pipeline = Pipeline(self.prefix, self.ext2rdr)
+ self.assertEqual(str(pipeline), 'Pipeline')
+ self.assertEqual(repr(pipeline), 'Pipeline(...)')
+
+ def test_equality(self):
+ pipeline = Pipeline(self.prefix, self.ext2rdr)
+ # a pipeline is equivalent to itself
+ self.assertEqual(pipeline, pipeline)
+ self.assertEqual(hash(pipeline), hash(pipeline))
+ # identical builds are equivalent
+ self.assertEqual(pipeline, Pipeline(self.prefix, self.ext2rdr))
+ self.assertEqual(hash(pipeline), hash(Pipeline(self.prefix, self.ext2rdr)))
+
+ # equivalence respects prefix
+ self.assertNotEqual(pipeline, Pipeline(URI('http://example.com/global/ent#'), self.ext2rdr))
+ self.assertNotEqual(hash(pipeline), hash(Pipeline(URI('http://example.com/global/ent#'), self.ext2rdr)))
+ # equivalence respects extractors/readers
+ ext2rdr = {ext: rdr for idx, (ext, rdr) in enumerate(self.ext2rdr.items()) if idx % 2 == 0}
+ self.assertNotEqual(pipeline, Pipeline(self.prefix, ext2rdr))
+ self.assertNotEqual(hash(pipeline), hash(Pipeline(self.prefix, ext2rdr)))
+
+ # equivalence respects schema
+ p2 = Pipeline(self.prefix, self.ext2rdr)
+ p2.schema = pipeline.schema.Empty()
+ self.assertNotEqual(pipeline, p2)
+ self.assertNotEqual(hash(pipeline), hash(p2))
+
+ # not equal to other types
+ class Foo(): pass
+ self.assertNotEqual(pipeline, Foo())
+ self.assertNotEqual(hash(pipeline), hash(Foo()))
+ self.assertNotEqual(pipeline, 123)
+ self.assertNotEqual(hash(pipeline), hash(123))
+ self.assertNotEqual(pipeline, None)
+ self.assertNotEqual(hash(pipeline), hash(None))
+
+
+ def test_call(self):
+ # build pipeline
+ pipeline = Pipeline(self.prefix, self.ext2rdr)
+ # build objects for tests
+ content_hash = 'e3bb4ab54e4a50d75626a1f76814f152f4edc60a82ad724aa2aa922ca5534427'
+ subject = Node(ns.bsfs.Entity, self.prefix + content_hash)
+ testfile = os.path.join(os.path.dirname(__file__), 'testfile.t')
+ p_filename = pipeline.schema.predicate(ns.bse.filename)
+ p_filesize = pipeline.schema.predicate(ns.bse.filesize)
+ p_author = pipeline.schema.predicate(ns.bse.author)
+ p_rating = pipeline.schema.predicate(ns.bse.rating)
+ entity = pipeline.schema.node(ns.bsfs.Entity)
+ p_invalid = pipeline.schema.predicate(ns.bsfs.Predicate).get_child(ns.bse.foo, range=entity)
+
+ # extract given predicates
+ self.assertSetEqual(set(pipeline(testfile, {p_filename, p_filesize})), {
+ (subject, p_filename, 'testfile.t'),
+ (subject, p_filesize, 11),
+ })
+ self.assertSetEqual(set(pipeline(testfile, {p_author})), {
+ (subject, p_author, 'Me, myself, and I'),
+ })
+ self.assertSetEqual(set(pipeline(testfile, {p_filename})), {
+ (subject, p_filename, 'testfile.t'),
+ })
+ self.assertSetEqual(set(pipeline(testfile, {p_filesize})), {
+ (subject, p_filesize, 11),
+ })
+ # extract all predicates
+ self.assertSetEqual(set(pipeline(testfile)), {
+ (subject, p_filename, 'testfile.t'),
+ (subject, p_filesize, 11),
+ (subject, p_author, 'Me, myself, and I'),
+ (subject, p_rating, 123),
+ })
+ # invalid predicate
+ self.assertSetEqual(set(pipeline(testfile, {p_invalid})), set())
+ # valid/invalid predicates mixed
+ self.assertSetEqual(set(pipeline(testfile, {p_filename, p_invalid})), {
+ (subject, p_filename, 'testfile.t'),
+ })
+ # invalid path
+ self.assertRaises(FileNotFoundError, list, pipeline('inexistent_file'))
+ # FIXME: unreadable file (e.g. permissions error)
+
+ def test_call_reader_err(self):
+ class FaultyReader(bsie.reader.path.Path):
+ def __call__(self, path):
+ raise errors.ReaderError('reader error')
+
+ pipeline = Pipeline(self.prefix, {bsie.extractor.generic.path.Path(): FaultyReader()})
+ with self.assertLogs(logging.getLogger('bsie.tools.pipeline'), logging.ERROR):
+ testfile = os.path.join(os.path.dirname(__file__), 'testfile.t')
+ p_filename = pipeline.schema.predicate(ns.bse.filename)
+ self.assertSetEqual(set(pipeline(testfile, {p_filename})), set())
+
+ def test_call_extractor_err(self):
+ class FaultyExtractor(bsie.extractor.generic.path.Path):
+ def extract(self, subject, content, predicates):
+ raise errors.ExtractorError('extractor error')
+
+ pipeline = Pipeline(self.prefix, {FaultyExtractor(): bsie.reader.path.Path()})
+ with self.assertLogs(logging.getLogger('bsie.tools.pipeline'), logging.ERROR):
+ testfile = os.path.join(os.path.dirname(__file__), 'testfile.t')
+ p_filename = pipeline.schema.predicate(ns.bse.filename)
+ self.assertSetEqual(set(pipeline(testfile, {p_filename})), set())
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##
diff --git a/test/tools/testfile.t b/test/tools/testfile.t
new file mode 100644
index 0000000..58bf1b8
--- /dev/null
+++ b/test/tools/testfile.t
@@ -0,0 +1 @@
+hello worl