aboutsummaryrefslogtreecommitdiffstats
path: root/test/query/test_validator.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/query/test_validator.py')
-rw-r--r--test/query/test_validator.py237
1 files changed, 234 insertions, 3 deletions
diff --git a/test/query/test_validator.py b/test/query/test_validator.py
index 0e88ad3..4f8364a 100644
--- a/test/query/test_validator.py
+++ b/test/query/test_validator.py
@@ -8,6 +8,10 @@ Author: Matthias Baumgartner, 2022
import unittest
# bsfs imports
+from bsfs import schema as _schema
+from bsfs.namespace import ns
+from bsfs.query import ast
+from bsfs.utils import errors
# objects to test
from bsfs.query.validator import Filter
@@ -16,10 +20,237 @@ from bsfs.query.validator import Filter
## code ##
class TestFilter(unittest.TestCase):
- def test_parse(self):
- raise NotImplementedError()
+ def setUp(self):
+ self.schema = _schema.Schema.from_string('''
+ prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+ prefix xsd: <http://www.w3.org/2001/XMLSchema#>
+
+ prefix bsfs: <http://bsfs.ai/schema/>
+ prefix bse: <http://bsfs.ai/schema/Entity#>
+
+ bsfs:Entity rdfs:subClassOf bsfs:Node .
+ bsfs:URI rdfs:subClassOf bsfs:Literal .
+
+ bsfs:Tag rdfs:subClassOf bsfs:Node .
+ xsd:string rdfs:subClassOf bsfs:Literal .
+ xsd:integer rdfs:subClassOf bsfs:Literal .
+
+ bse:comment rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Node ;
+ rdfs:range xsd:string ;
+ bsfs:unique "false"^^xsd:boolean .
+
+ bse:filesize rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range xsd:integer ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bse:tag rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range bsfs:Tag ;
+ bsfs:unique "false"^^xsd:boolean .
+
+ bse:label rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Tag ;
+ rdfs:range xsd:string ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bse:buddy rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsfs:Entity ;
+ rdfs:range bsfs:Node ;
+ bsfs:unique "false"^^xsd:boolean .
+
+ ''')
+ self.validate = Filter(self.schema)
+
+ def test_call(self):
+ # root_type must be a _schema.Node
+ self.assertRaises(TypeError, self.validate, 1234, None)
+ self.assertRaises(TypeError, self.validate, '1234', None)
+ self.assertRaises(TypeError, self.validate, self.schema.literal(ns.bsfs.URI), None)
+ # root_type must exist in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Image), None)
+ self.assertRaises(errors.ConsistencyError, self.validate, self.schema.node(ns.bsfs.Entity).get_child(ns.bsfs.Image), None)
+ # valid query returns true
+ self.assertTrue(self.validate(self.schema.node(ns.bsfs.Entity),
+ ast.filter.Any(ast.filter.OneOf(ns.bse.tag, ns.bse.buddy),
+ ast.filter.Or(
+ ast.filter.Is('http://example.com/symbol#1234'),
+ ast.filter.All(ns.bse.comment, ast.filter.StartsWith('foo')),
+ ast.filter.And(
+ ast.filter.Has(ns.bse.comment, ast.filter.Or(
+ ast.filter.GreaterThan(5),
+ ast.filter.LessThan(1),
+ )
+ ),
+ ast.filter.Not(ast.filter.Any(ns.bse.comment,
+ ast.filter.Not(ast.filter.Equals('hello world')))),
+ )))))
+ # invalid paths raise consistency error
+ self.assertRaises(errors.ConsistencyError, self.validate, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Any(ast.filter.OneOf(ns.bse.tag, ns.bse.buddy),
+ ast.filter.Or(
+ ast.filter.All(ns.bse.comment, ast.filter.Equals('hello world')),
+ ast.filter.All(ns.bse.label, ast.filter.Equals('hello world')), # domain mismatch
+ )))
+
+ def test_routing(self):
+ self.assertRaises(errors.BackendError, self.validate._parse_filter_expression, ast.filter.FilterExpression(), self.schema.node(ns.bsfs.Node))
+ self.assertRaises(errors.BackendError, self.validate._parse_predicate_expression, ast.filter.PredicateExpression())
+
+ def test_predicate(self):
+ # predicate must be in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate._predicate, ast.filter.Predicate(ns.bse.invalid))
+ # predicate must have a range
+ self.assertRaises(errors.BackendError, self.validate._predicate, ast.filter.Predicate(ns.bsfs.Predicate))
+ # predicate returns domain and range
+ self.assertEqual(self.validate._predicate(ast.filter.Predicate(ns.bse.tag)),
+ (self.schema.node(ns.bsfs.Entity), self.schema.node(ns.bsfs.Tag)))
+ # reverse is applied
+ self.assertEqual(self.validate._predicate(ast.filter.Predicate(ns.bse.tag, reverse=True)),
+ (self.schema.node(ns.bsfs.Tag), self.schema.node(ns.bsfs.Entity)))
+
+ def test_one_of(self):
+ # domains must both be nodes or literals
+ self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ast.filter.Predicate(ns.bse.label, reverse=True)))
+ # domains must be related
+ self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ns.bse.label))
+ # ranges must both be nodes or literals
+ self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ns.bse.comment))
+ # ranges must be related
+ self.assertRaises(errors.ConsistencyError, self.validate._one_of, ast.filter.OneOf(ns.bse.tag, ast.filter.Predicate(ns.bse.buddy, reverse=True)))
+ # one_of returns most specific domain
+ self.assertEqual(self.validate._one_of(ast.filter.OneOf(ns.bse.comment, ns.bse.label)),
+ (self.schema.node(ns.bsfs.Tag), self.schema.literal(ns.xsd.string)))
+ # one_of returns the most generic range
+ self.assertEqual(self.validate._one_of(ast.filter.OneOf(ns.bse.tag, ns.bse.buddy)),
+ (self.schema.node(ns.bsfs.Entity), self.schema.node(ns.bsfs.Node)))
+
+ def test_branch(self):
+ # type must be a node
+ self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.literal(ns.bsfs.Literal), None)
+ # type must be in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Invalid), None)
+ # predicate is verified
+ self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Any(ns.bsfs.Invalid, ast.filter.Is('http://example.com/entity#1234')))
+ # predicate must match the domain
+ self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Node),
+ ast.filter.Any(ns.bse.tag, ast.filter.Is('http://example.com/tag#1234')))
+ self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Tag),
+ ast.filter.Any(ns.bse.tag, ast.filter.Is('http://example.com/tag#1234')))
+ # child expression must be valid
+ self.assertRaises(errors.ConsistencyError, self.validate._branch, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Any(ns.bse.tag, ast.filter.Equals('hello world')))
+ # branch accepts valid expressions
+ self.assertIsNone(self.validate._branch(self.schema.node(ns.bsfs.Entity),
+ ast.filter.Any(ns.bse.tag, ast.filter.Is('http://example.com/entity#1234'))))
+ self.assertIsNone(self.validate._branch(self.schema.node(ns.bsfs.Entity),
+ ast.filter.All(ns.bse.tag, ast.filter.Is('http://example.com/entity#1234'))))
+
+ def test_agg(self):
+ # agg evaluates child expressions
+ self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.node(ns.bsfs.Entity),
+ ast.filter.And(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world')))
+ self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.literal(ns.xsd.string),
+ ast.filter.And(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world')))
+ self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Or(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world')))
+ self.assertRaises(errors.ConsistencyError, self.validate._agg, self.schema.literal(ns.xsd.string),
+ ast.filter.Or(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Equals('hello world')))
+ # agg works on nodes
+ self.assertIsNone(self.validate._agg(self.schema.node(ns.bsfs.Entity),
+ ast.filter.And(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Is('http://example.com/entity#4321'))))
+ self.assertIsNone(self.validate._agg(self.schema.node(ns.bsfs.Entity),
+ ast.filter.Or(ast.filter.Is('http://example.com/entity#1234'), ast.filter.Is('http://example.com/entity#4321'))))
+ # agg works on literals
+ self.assertIsNone(self.validate._agg(self.schema.literal(ns.xsd.string),
+ ast.filter.And(ast.filter.Equals('foobar'), ast.filter.Equals('hello world'))))
+ self.assertIsNone(self.validate._agg(self.schema.literal(ns.xsd.string),
+ ast.filter.Or(ast.filter.Equals('foobar'), ast.filter.Equals('hello world'))))
+
+ def test_not(self):
+ # not evaluates child expressions
+ self.assertRaises(errors.ConsistencyError, self.validate._not, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Not(ast.filter.Equals('hello world')))
+ self.assertRaises(errors.ConsistencyError, self.validate._not, self.schema.literal(ns.xsd.string),
+ ast.filter.Not(ast.filter.Is('http://example.com/entity#1234')))
+ # not works on nodes
+ self.assertIsNone(self.validate._not(self.schema.node(ns.bsfs.Entity),
+ ast.filter.Not(ast.filter.Is('http://example.com/entity#1234'))))
+ # not works on literals
+ self.assertIsNone(self.validate._not(self.schema.literal(ns.xsd.string),
+ ast.filter.Not(ast.filter.Equals('hello world'))))
+
+ def test_has(self):
+ # type must be node
+ self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.literal(ns.bsfs.Literal),
+ ast.filter.Has(ns.bse.tag))
+ # type must be in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Invalid),
+ ast.filter.Has(ns.bse.tag))
+ # has checks predicate
+ self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Has(ns.bse.invalid))
+ # predicate must match domain
+ self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Tag),
+ ast.filter.Has(ns.bse.tag))
+ # has checks count expression
+ self.assertRaises(errors.ConsistencyError, self.validate._has, self.schema.node(ns.bsfs.Entity),
+ ast.filter.Has(ns.bse.tag, ast.filter.Is('http://example.com/entity#1234')))
+ # has accepts correct expressions
+ self.assertIsNone(self.validate._has(self.schema.node(ns.bsfs.Entity), ast.filter.Has(ns.bse.tag, ast.filter.GreaterThan(5))))
+
+ def test_is(self):
+ # type must be node
+ self.assertRaises(errors.ConsistencyError, self.validate._is, self.schema.literal(ns.bsfs.Literal),
+ ast.filter.Is('http://example.com/foo'))
+ # type must be in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate._is, self.schema.node(ns.bsfs.Node).get_child(ns.bsfs.Invalid),
+ ast.filter.Is('http://example.com/foo'))
+ # is accepts correct expressions
+ self.assertIsNone(self.validate._is(self.schema.node(ns.bsfs.Entity), ast.filter.Is('http://example.com/entity#1234')))
+
+ def test_value(self):
+ # type must be literal
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node),
+ ast.filter.Equals('hello world'))
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node),
+ ast.filter.Substring('hello world'))
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node),
+ ast.filter.StartsWith('hello world'))
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.node(ns.bsfs.Node),
+ ast.filter.EndsWith('hello world'))
+ # type must be in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid),
+ ast.filter.Equals('hello world'))
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid),
+ ast.filter.Substring('hello world'))
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid),
+ ast.filter.StartsWith('hello world'))
+ self.assertRaises(errors.ConsistencyError, self.validate._value, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid),
+ ast.filter.EndsWith('hello world'))
+ # value accepts correct expressions
+ self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.Equals('hello world')))
+ self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.Substring('hello world')))
+ self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.StartsWith('hello world')))
+ self.assertIsNone(self.validate._value(self.schema.literal(ns.xsd.string), ast.filter.EndsWith('hello world')))
+
+ def test_bounded(self):
+ # type must be literal
+ self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.node(ns.bsfs.Node),
+ ast.filter.GreaterThan(0))
+ self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.node(ns.bsfs.Node),
+ ast.filter.LessThan(0))
+ # type must be in the schema
+ self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid),
+ ast.filter.GreaterThan(0))
+ self.assertRaises(errors.ConsistencyError, self.validate._bounded, self.schema.literal(ns.bsfs.Literal).get_child(ns.bsfs.Invalid),
+ ast.filter.LessThan(0))
+ # bounded accepts correct expressions
+ self.assertIsNone(self.validate._bounded(self.schema.literal(ns.xsd.integer), ast.filter.LessThan(0)))
+ self.assertIsNone(self.validate._bounded(self.schema.literal(ns.xsd.integer), ast.filter.GreaterThan(0)))
- # FIXME: subtests for individual functions
## main ##