diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:25:29 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:25:29 +0100 |
commit | 48b6081d0092e9c5a1b0ad79bdde2e51649bf61a (patch) | |
tree | 634198c34aae3c0306ce30ac7452abd7b53a14e8 /test/triple_store/sparql/test_utils.py | |
parent | 91437ba89d35bf482f3d9671bb99ef2fc69f5985 (diff) | |
parent | e4845c627e97a6d125bf33d9e7a4a8d373d7fc4a (diff) | |
download | bsfs-0.23.03.tar.gz bsfs-0.23.03.tar.bz2 bsfs-0.23.03.zip |
Merge branch 'develop'v0.23.03
Diffstat (limited to 'test/triple_store/sparql/test_utils.py')
-rw-r--r-- | test/triple_store/sparql/test_utils.py | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/test/triple_store/sparql/test_utils.py b/test/triple_store/sparql/test_utils.py new file mode 100644 index 0000000..44a1299 --- /dev/null +++ b/test/triple_store/sparql/test_utils.py @@ -0,0 +1,152 @@ + +# standard imports +import operator +import re +import unittest + +# external imports +import rdflib + +# bsie imports +from bsfs.namespace import ns + +# objects to test +from bsfs.triple_store.sparql.utils import GenHopName, Query + + +## code ## + +ns.bse = ns.bsfs.Entity() + +class TestGenHopName(unittest.TestCase): + def test_next(self): + # baseline + self.assertEqual(next(GenHopName(prefix='?foo', start=123)), '?foo123') + # respects prefix + self.assertEqual(next(GenHopName(prefix='?bar', start=123)), '?bar123') + # respects start + self.assertEqual(next(GenHopName(prefix='?foo', start=321)), '?foo321') + # counts up + cnt = GenHopName(prefix='?foo', start=998) + self.assertEqual(next(cnt), '?foo998') + self.assertEqual(next(cnt), '?foo999') + self.assertEqual(next(cnt), '?foo1000') + self.assertEqual(next(cnt), '?foo1001') + + def test_essentials(self): + # can get the prefix + self.assertEqual(GenHopName(prefix='?foo', start=123).prefix, '?foo') + # can get the counter + self.assertEqual(GenHopName(prefix='?foo', start=123).curr, 122) + + +class TestQuery(unittest.TestCase): + def setUp(self): + self.root_type = 'https://schema.bsfs.io/core/Entity' + self.root_head = '?root' + self.select = (('?head', 'name'), ) + self.where = f'?root <{ns.bse.tag}> ?head' + + def test_essentials(self): + # can access members + q = Query(self.root_type, self.root_head, self.select, self.where) + self.assertEqual(q.root_type, self.root_type) + self.assertEqual(q.root_head, self.root_head) + self.assertEqual(q.select, self.select) + self.assertEqual(q.where, self.where) + # comparison + self.assertEqual(q, Query(self.root_type, self.root_head, self.select, self.where)) + self.assertEqual(hash(q), hash(Query(self.root_type, self.root_head, self.select, self.where))) + # comparison respects root_type + self.assertNotEqual(q, Query('https://schema.bsfs.io/core/Tag', self.root_head, self.select, self.where)) + self.assertNotEqual(hash(q), hash(Query('https://schema.bsfs.io/core/Tag', self.root_head, self.select, self.where))) + # comparison respects root_head + self.assertNotEqual(q, Query(self.root_type, '?foo', self.select, self.where)) + self.assertNotEqual(hash(q), hash(Query(self.root_type, '?foo', self.select, self.where))) + # comparison respects select + self.assertNotEqual(q, Query(self.root_type, self.root_head, (('?head', 'foo'), ), self.where)) + self.assertNotEqual(hash(q), hash(Query(self.root_type, self.root_head, (('?head', 'foo'), ), self.where))) + # comparison respects where + self.assertNotEqual(q, Query(self.root_type, self.root_head, self.select, '?root bse:filename ?head')) + self.assertNotEqual(hash(q), hash(Query(self.root_type, self.root_head, self.select, '?root bse:filename ?head'))) + # string conversion + self.assertEqual(str(q), q.query) + self.assertEqual(repr(q), "Query(https://schema.bsfs.io/core/Entity, ?root, (('?head', 'name'),), ?root <https://schema.bsfs.io/core/Entity#tag> ?head)") + + def test_add(self): + q = Query(self.root_type, self.root_head, self.select, self.where) + # can only add a query + self.assertRaises(TypeError, operator.add, q, 1234) + self.assertRaises(TypeError, operator.add, q, 'foobar') + # root type and head must match + self.assertRaises(ValueError, operator.add, q, Query('https://schema.bsfs.io/core/Node/Tag', self.root_head)) + self.assertRaises(ValueError, operator.add, q, Query(self.root_type, '?foobar')) + # select and were are combined + combo = q + Query(self.root_type, self.root_head, (('?foo', 'bar'), ), f'?root <{ns.bse.filename}> ?foo') + self.assertEqual(combo.select, (('?head', 'name'), ('?foo', 'bar'))) + self.assertEqual(combo.where, f'?root <{ns.bse.tag}> ?head . ?root <{ns.bse.filename}> ?foo') + # select can be empty + combo = q + Query(self.root_type, self.root_head, None, f'?root <{ns.bse.filename}> ?foo') + self.assertEqual(combo.select, (('?head', 'name'), )) + combo = Query(self.root_type, self.root_head, None, f'?root <{ns.bse.filename}> ?foo') + q + self.assertEqual(combo.select, (('?head', 'name'), )) + combo = Query(self.root_type, self.root_head, None, self.where) + Query(self.root_type, self.root_head, None, f'?root <{ns.bse.filename}> ?foo') + self.assertEqual(combo.select, tuple()) + # where can be empty + combo = q + Query(self.root_type, self.root_head, (('?foo', 'bar'), )) + self.assertEqual(combo.where, self.where) + combo = Query(self.root_type, self.root_head, (('?foo', 'bar'), )) + q + self.assertEqual(combo.where, self.where) + combo = Query(self.root_type, self.root_head, self.select) + Query(self.root_type, self.root_head, (('?foo', 'bar'), )) + self.assertEqual(combo.where, '') + + def test_names(self): + self.assertEqual(Query(self.root_type, self.root_head, (('?head', 'name'), ), self.where).names, + ('name', )) + self.assertEqual(Query(self.root_type, self.root_head, (('?head', 'name'), ('?foo', 'bar')), self.where).names, + ('name', 'bar')) + + def test_query(self): + def normalize(value): + value = value.strip() + value = value.lower() + value = value.replace(r'\n', ' ') + value, _ = re.subn('\s\s+', ' ', value) + return value + # query composes a valid query + q = Query(self.root_type, self.root_head, self.select, self.where) + self.assertEqual(normalize(q.query), normalize(f'select distinct ?root (?head as ?name) where {{ ?root <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <https://schema.bsfs.io/core/Entity> . ?root <{ns.bse.tag}> ?head }} order by str(?root)')) + # select and where are optional + q = Query(self.root_type, self.root_head) + self.assertEqual(normalize(q.query), normalize(f'select distinct ?root where {{ ?root <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <https://schema.bsfs.io/core/Entity> . }} order by str(?root)')) + # select and where need not to correspond + q = Query(self.root_type, self.root_head, (('?head', 'name'), )) + self.assertEqual(normalize(q.query), normalize(f'select distinct ?root (?head as ?name) where {{ ?root <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <https://schema.bsfs.io/core/Entity> . }} order by str(?root)')) + # query is used for string representation + self.assertEqual(str(q), q.query) + + def test_call(self): + graph = rdflib.Graph() + # schema + graph.add((rdflib.URIRef('https://schema.bsfs.io/core/Document'), rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef('https://schema.bsfs.io/core/Entity'))) + # nodes + graph.add((rdflib.URIRef('http://example.com/entity#1234'), rdflib.URIRef(ns.rdf.type), rdflib.URIRef('https://schema.bsfs.io/core/Entity'))) + graph.add((rdflib.URIRef('http://example.com/doc#1234'), rdflib.URIRef(ns.rdf.type), rdflib.URIRef('https://schema.bsfs.io/core/Document'))) + # links + graph.add((rdflib.URIRef('http://example.com/entity#1234'), rdflib.URIRef(ns.bse.tag), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string))) + graph.add((rdflib.URIRef('http://example.com/doc#1234'), rdflib.URIRef(ns.bse.tag), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string))) + # run query on a given graph + query = Query(self.root_type, self.root_head, self.select, self.where) + self.assertSetEqual(set(query(graph)), { + (rdflib.URIRef('http://example.com/entity#1234'), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string)), + (rdflib.URIRef('http://example.com/doc#1234'), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string)), + }) + # query actually considers the passed graph + self.assertSetEqual(set(query(rdflib.Graph())), set()) + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## |