aboutsummaryrefslogtreecommitdiffstats
path: root/test/triple_store/sparql/test_utils.py
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-03-05 19:25:29 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-03-05 19:25:29 +0100
commit48b6081d0092e9c5a1b0ad79bdde2e51649bf61a (patch)
tree634198c34aae3c0306ce30ac7452abd7b53a14e8 /test/triple_store/sparql/test_utils.py
parent91437ba89d35bf482f3d9671bb99ef2fc69f5985 (diff)
parente4845c627e97a6d125bf33d9e7a4a8d373d7fc4a (diff)
downloadbsfs-0.23.03.tar.gz
bsfs-0.23.03.tar.bz2
bsfs-0.23.03.zip
Merge branch 'develop'v0.23.03
Diffstat (limited to 'test/triple_store/sparql/test_utils.py')
-rw-r--r--test/triple_store/sparql/test_utils.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/test/triple_store/sparql/test_utils.py b/test/triple_store/sparql/test_utils.py
new file mode 100644
index 0000000..44a1299
--- /dev/null
+++ b/test/triple_store/sparql/test_utils.py
@@ -0,0 +1,152 @@
+
+# standard imports
+import operator
+import re
+import unittest
+
+# external imports
+import rdflib
+
+# bsie imports
+from bsfs.namespace import ns
+
+# objects to test
+from bsfs.triple_store.sparql.utils import GenHopName, Query
+
+
+## code ##
+
+ns.bse = ns.bsfs.Entity()
+
+class TestGenHopName(unittest.TestCase):
+ def test_next(self):
+ # baseline
+ self.assertEqual(next(GenHopName(prefix='?foo', start=123)), '?foo123')
+ # respects prefix
+ self.assertEqual(next(GenHopName(prefix='?bar', start=123)), '?bar123')
+ # respects start
+ self.assertEqual(next(GenHopName(prefix='?foo', start=321)), '?foo321')
+ # counts up
+ cnt = GenHopName(prefix='?foo', start=998)
+ self.assertEqual(next(cnt), '?foo998')
+ self.assertEqual(next(cnt), '?foo999')
+ self.assertEqual(next(cnt), '?foo1000')
+ self.assertEqual(next(cnt), '?foo1001')
+
+ def test_essentials(self):
+ # can get the prefix
+ self.assertEqual(GenHopName(prefix='?foo', start=123).prefix, '?foo')
+ # can get the counter
+ self.assertEqual(GenHopName(prefix='?foo', start=123).curr, 122)
+
+
+class TestQuery(unittest.TestCase):
+ def setUp(self):
+ self.root_type = 'https://schema.bsfs.io/core/Entity'
+ self.root_head = '?root'
+ self.select = (('?head', 'name'), )
+ self.where = f'?root <{ns.bse.tag}> ?head'
+
+ def test_essentials(self):
+ # can access members
+ q = Query(self.root_type, self.root_head, self.select, self.where)
+ self.assertEqual(q.root_type, self.root_type)
+ self.assertEqual(q.root_head, self.root_head)
+ self.assertEqual(q.select, self.select)
+ self.assertEqual(q.where, self.where)
+ # comparison
+ self.assertEqual(q, Query(self.root_type, self.root_head, self.select, self.where))
+ self.assertEqual(hash(q), hash(Query(self.root_type, self.root_head, self.select, self.where)))
+ # comparison respects root_type
+ self.assertNotEqual(q, Query('https://schema.bsfs.io/core/Tag', self.root_head, self.select, self.where))
+ self.assertNotEqual(hash(q), hash(Query('https://schema.bsfs.io/core/Tag', self.root_head, self.select, self.where)))
+ # comparison respects root_head
+ self.assertNotEqual(q, Query(self.root_type, '?foo', self.select, self.where))
+ self.assertNotEqual(hash(q), hash(Query(self.root_type, '?foo', self.select, self.where)))
+ # comparison respects select
+ self.assertNotEqual(q, Query(self.root_type, self.root_head, (('?head', 'foo'), ), self.where))
+ self.assertNotEqual(hash(q), hash(Query(self.root_type, self.root_head, (('?head', 'foo'), ), self.where)))
+ # comparison respects where
+ self.assertNotEqual(q, Query(self.root_type, self.root_head, self.select, '?root bse:filename ?head'))
+ self.assertNotEqual(hash(q), hash(Query(self.root_type, self.root_head, self.select, '?root bse:filename ?head')))
+ # string conversion
+ self.assertEqual(str(q), q.query)
+ self.assertEqual(repr(q), "Query(https://schema.bsfs.io/core/Entity, ?root, (('?head', 'name'),), ?root <https://schema.bsfs.io/core/Entity#tag> ?head)")
+
+ def test_add(self):
+ q = Query(self.root_type, self.root_head, self.select, self.where)
+ # can only add a query
+ self.assertRaises(TypeError, operator.add, q, 1234)
+ self.assertRaises(TypeError, operator.add, q, 'foobar')
+ # root type and head must match
+ self.assertRaises(ValueError, operator.add, q, Query('https://schema.bsfs.io/core/Node/Tag', self.root_head))
+ self.assertRaises(ValueError, operator.add, q, Query(self.root_type, '?foobar'))
+ # select and were are combined
+ combo = q + Query(self.root_type, self.root_head, (('?foo', 'bar'), ), f'?root <{ns.bse.filename}> ?foo')
+ self.assertEqual(combo.select, (('?head', 'name'), ('?foo', 'bar')))
+ self.assertEqual(combo.where, f'?root <{ns.bse.tag}> ?head . ?root <{ns.bse.filename}> ?foo')
+ # select can be empty
+ combo = q + Query(self.root_type, self.root_head, None, f'?root <{ns.bse.filename}> ?foo')
+ self.assertEqual(combo.select, (('?head', 'name'), ))
+ combo = Query(self.root_type, self.root_head, None, f'?root <{ns.bse.filename}> ?foo') + q
+ self.assertEqual(combo.select, (('?head', 'name'), ))
+ combo = Query(self.root_type, self.root_head, None, self.where) + Query(self.root_type, self.root_head, None, f'?root <{ns.bse.filename}> ?foo')
+ self.assertEqual(combo.select, tuple())
+ # where can be empty
+ combo = q + Query(self.root_type, self.root_head, (('?foo', 'bar'), ))
+ self.assertEqual(combo.where, self.where)
+ combo = Query(self.root_type, self.root_head, (('?foo', 'bar'), )) + q
+ self.assertEqual(combo.where, self.where)
+ combo = Query(self.root_type, self.root_head, self.select) + Query(self.root_type, self.root_head, (('?foo', 'bar'), ))
+ self.assertEqual(combo.where, '')
+
+ def test_names(self):
+ self.assertEqual(Query(self.root_type, self.root_head, (('?head', 'name'), ), self.where).names,
+ ('name', ))
+ self.assertEqual(Query(self.root_type, self.root_head, (('?head', 'name'), ('?foo', 'bar')), self.where).names,
+ ('name', 'bar'))
+
+ def test_query(self):
+ def normalize(value):
+ value = value.strip()
+ value = value.lower()
+ value = value.replace(r'\n', ' ')
+ value, _ = re.subn('\s\s+', ' ', value)
+ return value
+ # query composes a valid query
+ q = Query(self.root_type, self.root_head, self.select, self.where)
+ self.assertEqual(normalize(q.query), normalize(f'select distinct ?root (?head as ?name) where {{ ?root <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <https://schema.bsfs.io/core/Entity> . ?root <{ns.bse.tag}> ?head }} order by str(?root)'))
+ # select and where are optional
+ q = Query(self.root_type, self.root_head)
+ self.assertEqual(normalize(q.query), normalize(f'select distinct ?root where {{ ?root <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <https://schema.bsfs.io/core/Entity> . }} order by str(?root)'))
+ # select and where need not to correspond
+ q = Query(self.root_type, self.root_head, (('?head', 'name'), ))
+ self.assertEqual(normalize(q.query), normalize(f'select distinct ?root (?head as ?name) where {{ ?root <{ns.rdf.type}>/<{ns.rdfs.subClassOf}>* <https://schema.bsfs.io/core/Entity> . }} order by str(?root)'))
+ # query is used for string representation
+ self.assertEqual(str(q), q.query)
+
+ def test_call(self):
+ graph = rdflib.Graph()
+ # schema
+ graph.add((rdflib.URIRef('https://schema.bsfs.io/core/Document'), rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef('https://schema.bsfs.io/core/Entity')))
+ # nodes
+ graph.add((rdflib.URIRef('http://example.com/entity#1234'), rdflib.URIRef(ns.rdf.type), rdflib.URIRef('https://schema.bsfs.io/core/Entity')))
+ graph.add((rdflib.URIRef('http://example.com/doc#1234'), rdflib.URIRef(ns.rdf.type), rdflib.URIRef('https://schema.bsfs.io/core/Document')))
+ # links
+ graph.add((rdflib.URIRef('http://example.com/entity#1234'), rdflib.URIRef(ns.bse.tag), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string)))
+ graph.add((rdflib.URIRef('http://example.com/doc#1234'), rdflib.URIRef(ns.bse.tag), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string)))
+ # run query on a given graph
+ query = Query(self.root_type, self.root_head, self.select, self.where)
+ self.assertSetEqual(set(query(graph)), {
+ (rdflib.URIRef('http://example.com/entity#1234'), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string)),
+ (rdflib.URIRef('http://example.com/doc#1234'), rdflib.Literal('tag#1234', datatype=rdflib.XSD.string)),
+ })
+ # query actually considers the passed graph
+ self.assertSetEqual(set(query(rdflib.Graph())), set())
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##