diff options
Diffstat (limited to 'test/lib/test_pipeline.py')
-rw-r--r-- | test/lib/test_pipeline.py | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/test/lib/test_pipeline.py b/test/lib/test_pipeline.py new file mode 100644 index 0000000..8fecc74 --- /dev/null +++ b/test/lib/test_pipeline.py @@ -0,0 +1,175 @@ +""" + +Part of the bsie test suite. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +import logging +import os +import unittest + +# bsie imports +from bsie.utils import bsfs, errors, node, ns +import bsie.extractor.generic.constant +import bsie.extractor.generic.path +import bsie.extractor.generic.stat +import bsie.reader.path +import bsie.reader.stat + +# objects to test +from bsie.lib.pipeline import Pipeline + + +## code ## + +class TestPipeline(unittest.TestCase): + def setUp(self): + # constant A + csA = ''' + bse:author rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:File ; + rdfs:range xsd:string ; + bsfs:unique "true"^^xsd:boolean . + ''' + tupA = [('http://bsfs.ai/schema/Entity#author', 'Me, myself, and I')] + # constant B + csB = ''' + bse:rating rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:File ; + rdfs:range xsd:integer ; + bsfs:unique "true"^^xsd:boolean . + ''' + tupB = [('http://bsfs.ai/schema/Entity#rating', 123)] + # extractors/readers + self.ext2rdr = { + bsie.extractor.generic.path.Path(): bsie.reader.path.Path(), + bsie.extractor.generic.stat.Stat(): bsie.reader.stat.Stat(), + bsie.extractor.generic.constant.Constant(csA, tupA): None, + bsie.extractor.generic.constant.Constant(csB, tupB): None, + } + self.prefix = bsfs.Namespace('http://example.com/local/') + + def test_essentials(self): + pipeline = Pipeline(self.prefix, self.ext2rdr) + self.assertEqual(str(pipeline), 'Pipeline') + self.assertEqual(repr(pipeline), 'Pipeline(...)') + + def test_equality(self): + pipeline = Pipeline(self.prefix, self.ext2rdr) + # a pipeline is equivalent to itself + self.assertEqual(pipeline, pipeline) + self.assertEqual(hash(pipeline), hash(pipeline)) + # identical builds are equivalent + self.assertEqual(pipeline, Pipeline(self.prefix, self.ext2rdr)) + self.assertEqual(hash(pipeline), hash(Pipeline(self.prefix, self.ext2rdr))) + + # equivalence respects prefix + self.assertNotEqual(pipeline, Pipeline(bsfs.URI('http://example.com/global/ent#'), self.ext2rdr)) + self.assertNotEqual(hash(pipeline), hash(Pipeline(bsfs.URI('http://example.com/global/ent#'), self.ext2rdr))) + # equivalence respects extractors/readers + ext2rdr = {ext: rdr for idx, (ext, rdr) in enumerate(self.ext2rdr.items()) if idx % 2 == 0} + self.assertNotEqual(pipeline, Pipeline(self.prefix, ext2rdr)) + self.assertNotEqual(hash(pipeline), hash(Pipeline(self.prefix, ext2rdr))) + + # equivalence respects schema + p2 = Pipeline(self.prefix, self.ext2rdr) + p2._schema = bsfs.schema.Schema() + self.assertNotEqual(pipeline, p2) + self.assertNotEqual(hash(pipeline), hash(p2)) + + # not equal to other types + class Foo(): pass + self.assertNotEqual(pipeline, Foo()) + self.assertNotEqual(hash(pipeline), hash(Foo())) + self.assertNotEqual(pipeline, 123) + self.assertNotEqual(hash(pipeline), hash(123)) + self.assertNotEqual(pipeline, None) + self.assertNotEqual(hash(pipeline), hash(None)) + + + def test_call(self): + # build pipeline + pipeline = Pipeline(self.prefix, self.ext2rdr) + # build objects for tests + content_hash = 'a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447' + subject = node.Node(ns.bsfs.File, (self.prefix + 'file#')[content_hash]) + testfile = os.path.join(os.path.dirname(__file__), 'testfile.t') + p_filename = pipeline.schema.predicate(ns.bse.filename) + p_filesize = pipeline.schema.predicate(ns.bse.filesize) + p_author = pipeline.schema.predicate(ns.bse.author) + p_rating = pipeline.schema.predicate(ns.bse.rating) + entity = pipeline.schema.node(ns.bsfs.File) + p_invalid = pipeline.schema.predicate(ns.bsfs.Predicate).child(ns.bse.foo, range=entity) + + # extract given predicates + self.assertSetEqual(set(pipeline(testfile, {p_filename, p_filesize})), { + (subject, p_filename, 'testfile.t'), + (subject, p_filesize, 12), + }) + self.assertSetEqual(set(pipeline(testfile, {p_author})), { + (subject, p_author, 'Me, myself, and I'), + }) + self.assertSetEqual(set(pipeline(testfile, {p_filename})), { + (subject, p_filename, 'testfile.t'), + }) + self.assertSetEqual(set(pipeline(testfile, {p_filesize})), { + (subject, p_filesize, 12), + }) + # extract all predicates + self.assertSetEqual(set(pipeline(testfile)), { + (subject, p_filename, 'testfile.t'), + (subject, p_filesize, 12), + (subject, p_author, 'Me, myself, and I'), + (subject, p_rating, 123), + }) + # invalid predicate + self.assertSetEqual(set(pipeline(testfile, {p_invalid})), set()) + # valid/invalid predicates mixed + self.assertSetEqual(set(pipeline(testfile, {p_filename, p_invalid})), { + (subject, p_filename, 'testfile.t'), + }) + # invalid path + self.assertRaises(FileNotFoundError, list, pipeline('inexistent_file')) + # FIXME: unreadable file (e.g. permissions error) + + def test_call_reader_err(self): + class FaultyReader(bsie.reader.path.Path): + def __call__(self, path): + raise errors.ReaderError('reader error') + + pipeline = Pipeline(self.prefix, {bsie.extractor.generic.path.Path(): FaultyReader()}) + with self.assertLogs(logging.getLogger('bsie.lib.pipeline'), logging.ERROR): + testfile = os.path.join(os.path.dirname(__file__), 'testfile.t') + p_filename = pipeline.schema.predicate(ns.bse.filename) + self.assertSetEqual(set(pipeline(testfile, {p_filename})), set()) + + def test_call_extractor_err(self): + class FaultyExtractor(bsie.extractor.generic.path.Path): + def extract(self, subject, content, predicates): + raise errors.ExtractorError('extractor error') + + pipeline = Pipeline(self.prefix, {FaultyExtractor(): bsie.reader.path.Path()}) + with self.assertLogs(logging.getLogger('bsie.lib.pipeline'), logging.ERROR): + testfile = os.path.join(os.path.dirname(__file__), 'testfile.t') + p_filename = pipeline.schema.predicate(ns.bse.filename) + self.assertSetEqual(set(pipeline(testfile, {p_filename})), set()) + + def test_predicates(self): + # build pipeline + pipeline = Pipeline(self.prefix, self.ext2rdr) + # + self.assertSetEqual(set(pipeline.principals), { + pipeline.schema.predicate(ns.bse.filename), + pipeline.schema.predicate(ns.bse.filesize), + pipeline.schema.predicate(ns.bse.author), + pipeline.schema.predicate(ns.bse.rating), + }) + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## |