diff options
Diffstat (limited to 'test/query/test_validator.py')
-rw-r--r-- | test/query/test_validator.py | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/test/query/test_validator.py b/test/query/test_validator.py new file mode 100644 index 0000000..4f8364a --- /dev/null +++ b/test/query/test_validator.py @@ -0,0 +1,261 @@ +""" + +Part of the tagit test suite. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import unittest + +# bsfs imports +from bsfs import schema as _schema +from bsfs.namespace import ns +from bsfs.query import ast +from bsfs.utils import errors + +# objects to test +from bsfs.query.validator import Filter + + +## code ## + +class TestFilter(unittest.TestCase): + def setUp(self): + self.schema = _schema.Schema.from_string(''' + prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + prefix xsd: <http://www.w3.org/2001/XMLSchema#> + + prefix bsfs: <http://bsfs.ai/schema/> + prefix bse: <http://bsfs.ai/schema/Entity#> + + bsfs:Entity rdfs:subClassOf bsfs:Node . + bsfs:URI rdfs:subClassOf bsfs:Literal . + + bsfs:Tag rdfs:subClassOf bsfs:Node . + xsd:string rdfs:subClassOf bsfs:Literal . + xsd:integer rdfs:subClassOf bsfs:Literal . + + bse:comment rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:Node ; + rdfs:range xsd:string ; + bsfs:unique "false"^^xsd:boolean . + + bse:filesize rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:Entity ; + rdfs:range xsd:integer ; + bsfs:unique "true"^^xsd:boolean . + + bse:tag rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:Entity ; + rdfs:range bsfs:Tag ; + bsfs:unique "false"^^xsd:boolean . + + bse:label rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:Tag ; + rdfs:range xsd:string ; + bsfs:unique "true"^^xsd:boolean . + + bse:buddy rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsfs:Entity ; + rdfs:range bsfs:Node ; + bsfs:unique "false"^^xsd:boolean . + + ''') + self.validate = Filter(self.schema) + + def test_call(self): + # root_type must be a _schema.Node + self.assertRaises(TypeError, self.validate, 1234, None) + self.assertRaises(TypeError, self.validate, '1234', None) + self.assertRaises(TypeError, self.validate, self.schema.literal(ns.bsfs.URI), None) + # root_type must exist in the schema + self.assertRaises(errors.ConsistencyError, self.validate, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Image), None) + self.assertRaises(errors.ConsistencyError, self.validate, self.schema.node(ns.bsfs.Entity).get_child(ns.bsfs.Image), None) + # valid query returns true + self.assertTrue(self.validate(self.schema.node(ns.bsfs.Entity), + ast.filter.Any(ast.filter.OneOf(ns.bse.tag, ns.bse.buddy), + ast.filter.Or( + ast.filter.Is('http://example.com/symbol#1234'), + ast.filter.All(ns.bse.comment, ast.filter.StartsWith('foo')), + ast.filter.And( + ast.filter.Has(ns.bse.comment, ast.filter.Or( + ast.filter.GreaterThan(5), + ast.filter.LessThan(1), + ) + ), + ast.filter.Not(ast.filter.Any(ns.bse.comment, + ast.filter.Not(ast.filter.Equals('hello world')))), + ))))) + # invalid paths raise consistency error + self.assertRaises(errors.ConsistencyError, self.validate, self.schema.node(ns.bsfs.Entity), + ast.filter.Any(ast.filter.OneOf(ns.bse.tag, ns.bse.buddy), + ast.filter.Or( + ast.filter.All(ns.bse.comment, ast.filter.Equals('hello world')), + ast.filter.All(ns.bse.label, ast.filter.Equals('hello world')), # domain mismatch + ))) + + def test_routing(self): + self.assertRaises(errors.BackendError, self.validate._parse_filter_expression, ast.filter.FilterExpression(), self.schema.node(ns.bsfs.Node)) + self.assertRaises(errors.BackendError, self.validate._parse_predicate_expression, ast.filter.PredicateExpression()) + + def test_predicate(self): + # predicate must be in the schema + self.assertRaises(errors.ConsistencyError, self.validate._predicate, ast.filter.Predicate(ns.bse.invalid)) + # predicate must have a range + self.assertRaises(errors.BackendError, self.validate._predicate, ast.filter.Predicate(ns.bsfs.Predicate)) + # predicate returns domain and range + self.assertEqual(self.validate._predicate(ast.filter.Predicate(ns.bse.tag)), + (self.schema.node(ns.bsfs.Entity), self.schema.node(ns.bsfs.Tag))) + # reverse is applied + self.assertEqual(self.validate._predicate(ast.filter.Predicate(ns.bse.tag, reverse=True)), + (self.schema.node(ns.bsfs.Tag), self.schema.node(ns.bsfs.Entity))) + + def test_one_of(self): + # domains must both be nodes or literals + self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ast.filter.Predicate(ns.bse.label, reverse=True))) + # domains must be related + self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ns.bse.label)) + # ranges must both be nodes or literals + self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ns.bse.comment)) + # ranges must be related + self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ast.filter.Predicate(ns.bse.buddy, reverse=True))) + # one_of returns most specific domain + self.assertEqual(self.validate._one_of(ast.filter.OneOf(ns.bse.comment, ns.bse.label)), + (self.schema.node(ns.bsfs.Tag), self.schema.literal(ns.xsd.string))) + # one_of returns the most generic range + self.assertEqual(self.validate._one_of(ast.filter.OneOf(ns.bse.tag, ns.bse.buddy)), + (self.schema.node(ns.bsfs.Entity), self.schema.node(ns.bsfs.Node))) + + def test_branch(self): + # type must be a node + self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.literal(ns.bsfs.Literal), None) + # type must be in the schema + self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Invalid), None) + # predicate is verified + self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Entity), + ast.filter.Any(ns.bsfs.Invalid, ast.filter.Is('http://example.com/entity#1234'))) + # predicate must match the domain + self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Node), + ast.filter.Any(ns.bse.tag, ast.filter.Is('http://example.com/tag#1234'))) + self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Tag), + ast.filter.Any(ns.bse.tag, ast.filter.Is('http://example.com/tag#1234'))) + # child expression must be valid + self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Entity), + ast.filter.Any(ns.bse.tag, ast.filter.Equals('hello world'))) + # branch accepts valid expressions + self.assertIsNone(self.validate._branch(self.schema.node(ns.bsfs.Entity), + ast.filter.Any(ns.bse.tag, ast.filter.Is('http://example.com/entity#1234')))) + self.assertIsNone(self.validate._branch(self.schema.node(ns.bsfs.Entity), + ast.filter.All(ns.bse.tag, ast.filter.Is('http://example.com/entity#1234')))) + + def test_agg(self): + # agg evaluates child expressions + self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.node(ns.bsfs.Entity), + ast.filter.And(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world'))) + self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.literal(ns.xsd.string), + ast.filter.And(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world'))) + self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.node(ns.bsfs.Entity), + ast.filter.Or(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world'))) + self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.literal(ns.xsd.string), + ast.filter.Or(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world'))) + # agg works on nodes + self.assertIsNone(self.validate._agg(self.schema.node(ns.bsfs.Entity), + ast.filter.And(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Is('http://example.com/entity#4321')))) + self.assertIsNone(self.validate._agg(self.schema.node(ns.bsfs.Entity), + ast.filter.Or(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Is('http://example.com/entity#4321')))) + # agg works on literals + self.assertIsNone(self.validate._agg(self.schema.literal(ns.xsd.string), + ast.filter.And(ast.filter.Equals('foobar'), ast.filter.Equals('hello world')))) + self.assertIsNone(self.validate._agg(self.schema.literal(ns.xsd.string), + ast.filter.Or(ast.filter.Equals('foobar'), ast.filter.Equals('hello world')))) + + def test_not(self): + # not evaluates child expressions + self.assertRaises(errors.ConsistencyError, self.validate._not, self.schema.node(ns.bsfs.Entity), + ast.filter.Not(ast.filter.Equals('hello world'))) + self.assertRaises(errors.ConsistencyError, self.validate._not, self.schema.literal(ns.xsd.string), + ast.filter.Not(ast.filter.Is('http://example.com/entity#1234'))) + # not works on nodes + self.assertIsNone(self.validate._not(self.schema.node(ns.bsfs.Entity), + ast.filter.Not(ast.filter.Is('http://example.com/entity#1234')))) + # not works on literals + self.assertIsNone(self.validate._not(self.schema.literal(ns.xsd.string), + ast.filter.Not(ast.filter.Equals('hello world')))) + + def test_has(self): + # type must be node + self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.literal(ns.bsfs.Literal), + ast.filter.Has(ns.bse.tag)) + # type must be in the schema + self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Invalid), + ast.filter.Has(ns.bse.tag)) + # has checks predicate + self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Entity), + ast.filter.Has(ns.bse.invalid)) + # predicate must match domain + self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Tag), + ast.filter.Has(ns.bse.tag)) + # has checks count expression + self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Entity), + ast.filter.Has(ns.bse.tag, ast.filter.Is('http://example.com/entity#1234'))) + # has accepts correct expressions + self.assertIsNone(self.validate._has(self.schema.node(ns.bsfs.Entity), ast.filter.Has(ns.bse.tag, ast.filter.GreaterThan(5)))) + + def test_is(self): + # type must be node + self.assertRaises(errors.ConsistencyError, self.validate._is, self.schema.literal(ns.bsfs.Literal), + ast.filter.Is('http://example.com/foo')) + # type must be in the schema + self.assertRaises(errors.ConsistencyError, self.validate._is, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Invalid), + ast.filter.Is('http://example.com/foo')) + # is accepts correct expressions + self.assertIsNone(self.validate._is(self.schema.node(ns.bsfs.Entity), ast.filter.Is('http://example.com/entity#1234'))) + + def test_value(self): + # type must be literal + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node), + ast.filter.Equals('hello world')) + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node), + ast.filter.Substring('hello world')) + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node), + ast.filter.StartsWith('hello world')) + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node), + ast.filter.EndsWith('hello world')) + # type must be in the schema + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid), + ast.filter.Equals('hello world')) + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid), + ast.filter.Substring('hello world')) + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid), + ast.filter.StartsWith('hello world')) + self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid), + ast.filter.EndsWith('hello world')) + # value accepts correct expressions + self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.Equals('hello world'))) + self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.Substring('hello world'))) + self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.StartsWith('hello world'))) + self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.EndsWith('hello world'))) + + def test_bounded(self): + # type must be literal + self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.node(ns.bsfs.Node), + ast.filter.GreaterThan(0)) + self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.node(ns.bsfs.Node), + ast.filter.LessThan(0)) + # type must be in the schema + self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid), + ast.filter.GreaterThan(0)) + self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid), + ast.filter.LessThan(0)) + # bounded accepts correct expressions + self.assertIsNone(self.validate._bounded(self.schema.literal(ns.xsd.integer), ast.filter.LessThan(0))) + self.assertIsNone(self.validate._bounded(self.schema.literal(ns.xsd.integer), ast.filter.GreaterThan(0))) + + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## |