""" Part of the bsfs test suite. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import unittest # bsie imports from bsfs import schema from bsfs.graph.nodes import Nodes from bsfs.namespace import ns from bsfs.query import ast from bsfs.triple_store import SparqlStore from bsfs.utils import URI, errors # objects to test from bsfs.graph.graph import Graph ## code ## class TestGraph(unittest.TestCase): def setUp(self): self.user = URI('http://example.com/me') self.backend = SparqlStore.Open() self.backend.schema = schema.from_string(''' prefix rdfs: prefix bsfs: bsfs:Entity rdfs:subClassOf bsfs:Node . ''') def test_str(self): self.assertEqual(str(Graph(self.backend, self.user)), 'Graph(SparqlStore(uri=None), http://example.com/me)') self.assertEqual(repr(Graph(self.backend, self.user)), 'Graph(backend=SparqlStore(uri=None), user=http://example.com/me)') # str respects backend class Foo(SparqlStore): pass self.assertEqual(str(Graph(Foo.Open(), self.user)), 'Graph(Foo(uri=None), http://example.com/me)') self.assertEqual(repr(Graph(Foo.Open(), self.user)), 'Graph(backend=Foo(uri=None), user=http://example.com/me)') # str respect user self.assertEqual(str(Graph(self.backend, URI('http://example.com/you'))), 'Graph(SparqlStore(uri=None), http://example.com/you)') self.assertEqual(repr(Graph(self.backend, URI('http://example.com/you'))), 'Graph(backend=SparqlStore(uri=None), user=http://example.com/you)') # str respects type class Bar(Graph): pass self.assertEqual(str(Bar(self.backend, self.user)), 'Bar(SparqlStore(uri=None), http://example.com/me)') self.assertEqual(repr(Bar(self.backend, self.user)), 'Bar(backend=SparqlStore(uri=None), user=http://example.com/me)') def test_equality(self): graph = Graph(self.backend, self.user) # instance is equal to itself self.assertEqual(graph, graph) self.assertEqual(hash(graph), hash(graph)) # instance is equal to a clone self.assertEqual(graph, Graph(self.backend, self.user)) self.assertEqual(hash(graph), hash(Graph(self.backend, self.user))) # equality respects backend self.assertNotEqual(graph, Graph(SparqlStore.Open(), self.user)) self.assertNotEqual(hash(graph), hash(Graph(SparqlStore.Open(), self.user))) # equality respects user self.assertNotEqual(graph, Graph(self.backend, URI('http://example.com/you'))) self.assertNotEqual(hash(graph), hash(Graph(self.backend, URI('http://example.com/you')))) def test_essentials(self): graph = Graph(self.backend, self.user) # schema self.assertEqual(graph.schema, self.backend.schema) self.assertRaises(AttributeError, setattr, graph, 'schema', None) def test_node(self): graph = Graph(self.backend, self.user) guid = URI('http://example.com/me/entity#1234') # returns a Nodes instance self.assertEqual( graph.node(ns.bsfs.Entity, guid), Nodes(self.backend, self.user, graph.schema.node(ns.bsfs.Entity), {guid})) # node_type must be in the schema self.assertRaises(KeyError, graph.node, ns.bsfs.Invalid, guid) def test_nodes(self): graph = Graph(self.backend, self.user) guids = {URI('http://example.com/me/entity#1234'), URI('http://example.com/me/entity#4321')} # returns a Nodes instance self.assertEqual( graph.nodes(ns.bsfs.Entity, guids), Nodes(self.backend, self.user, graph.schema.node(ns.bsfs.Entity), guids)) # node_type must be in the schema self.assertRaises(KeyError, graph.nodes, ns.bsfs.Invalid, guids) def test_migrate(self): # setup graph = Graph(self.backend, self.user) # argument must be a schema class Foo(): pass self.assertRaises(TypeError, graph.migrate, 'hello world') self.assertRaises(TypeError, graph.migrate, 1234) self.assertRaises(TypeError, graph.migrate, Foo()) # cannot append inconsistent schema self.assertRaises(errors.ConsistencyError, graph.migrate, schema.Schema({}, { schema.Node(ns.bsfs.Entity, schema.Node(ns.bsfs.Intermediate, schema.Node(ns.bsfs.Node, None)))}), append=True) # cannot migrate to inconsistent schema self.assertRaises(errors.ConsistencyError, graph.migrate, schema.Schema({}, { schema.Node(ns.bsfs.Entity, schema.Node(ns.bsfs.Intermediate, schema.Node(ns.bsfs.Node, None)))}), append=False) # can migrate to compatible schema target_1 = schema.from_string(''' prefix rdfs: prefix xsd: prefix bsfs: prefix bse: bsfs:Entity rdfs:subClassOf bsfs:Node . xsd:string rdfs:subClassOf bsfs:Literal . xsd:integer rdfs:subClassOf bsfs:Literal . bse:filename rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Entity ; rdfs:range xsd:string ; bsfs:unique "false"^^xsd:boolean . bse:filesize rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Entity ; rdfs:range xsd:integer; bsfs:unique "false"^^xsd:boolean . ''') graph.migrate(target_1) # new schema is applied self.assertLess(target_1, graph.schema) # graph appends its predicates self.assertEqual(graph.schema, target_1 + schema.from_string(''' prefix rdfs: prefix xsd: prefix bsfs: prefix bsm: xsd:integer rdfs:subClassOf bsfs:Literal . bsm:t_created rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Node ; rdfs:range xsd:integer ; bsfs:unique "true"^^xsd:boolean . ''')) # can overwrite the current schema target_2 = schema.from_string(''' prefix rdfs: prefix xsd: prefix bsfs: prefix bse: bsfs:Entity rdfs:subClassOf bsfs:Node . xsd:string rdfs:subClassOf bsfs:Literal . xsd:integer rdfs:subClassOf bsfs:Literal . bse:filename rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Entity ; rdfs:range xsd:string ; bsfs:unique "false"^^xsd:boolean . bse:author rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Entity ; rdfs:range xsd:string ; bsfs:unique "true"^^xsd:boolean . ''') graph.migrate(target_2, append=False) # append overwrites existing predicates self.assertFalse(target_1 <= graph.schema) # new schema is applied self.assertLess(target_2, graph.schema) # graph appends its predicates self.assertEqual(graph.schema, target_2 + schema.from_string(''' prefix rdfs: prefix xsd: prefix bsfs: prefix bsm: xsd:integer rdfs:subClassOf bsfs:Literal . bsm:t_created rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Node ; rdfs:range xsd:integer ; bsfs:unique "true"^^xsd:boolean . ''')) def test_get(self): # setup graph = Graph(self.backend, self.user) graph.migrate(schema.from_string(''' prefix rdfs: prefix xsd: prefix bsfs: prefix bse: bsfs:Entity rdfs:subClassOf bsfs:Node . bsfs:Tag rdfs:subClassOf bsfs:Node . xsd:string rdfs:subClassOf bsfs:Literal . bse:tag rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Entity ; rdfs:range bsfs:Tag ; bsfs:unique "false"^^xsd:boolean . bse:comment rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Node ; rdfs:range xsd:string ; bsfs:unique "false"^^xsd:boolean . ''')) # add some instances ents = graph.nodes(ns.bsfs.Entity, {URI('http://example.com/entity#1234'), URI('http://example.com/entity#4321')}) tags = graph.nodes(ns.bsfs.Tag, {URI('http://example.com/tag#1234'), URI('http://example.com/tag#4321')}) # add some node links ents.set(ns.bse.tag, tags) # add some literals graph.node(ns.bsfs.Entity, URI('http://example.com/entity#1234')).set(ns.bse.comment, 'hello world') graph.node(ns.bsfs.Entity, URI('http://example.com/entity#1234')).set(ns.bse.comment, 'foo') graph.node(ns.bsfs.Entity, URI('http://example.com/entity#1234')).set(ns.bse.comment, 'foobar') graph.node(ns.bsfs.Tag, URI('http://example.com/tag#1234')).set(ns.bse.comment, 'foo') graph.node(ns.bsfs.Tag, URI('http://example.com/tag#4321')).set(ns.bse.comment, 'bar') # get exception for invalid query self.assertRaises(errors.ConsistencyError, graph.get, ns.bsfs.Entity, ast.filter.Any(ns.bse.tag, ast.filter.Equals('hello world'))) # query returns nodes self.assertEqual(graph.get(ns.bsfs.Entity, ast.filter.Any(ns.bse.tag, ast.filter.Is(tags))), ents) self.assertEqual(graph.get(ns.bsfs.Entity, ast.filter.Any(ns.bse.comment, ast.filter.StartsWith('foo'))), graph.node(ns.bsfs.Entity, URI('http://example.com/entity#1234'))) self.assertEqual(graph.get(ns.bsfs.Node, ast.filter.Any(ns.bse.comment, ast.filter.StartsWith('foo'))), graph.nodes(ns.bsfs.Node, {URI('http://example.com/entity#1234'), URI('http://example.com/tag#1234')})) self.assertEqual(graph.get(ns.bsfs.Entity, ast.filter.Or( ast.filter.Any(ns.bse.comment, ast.filter.EndsWith('bar')), ast.filter.Any(ns.bse.tag, ast.filter.All(ns.bse.comment, ast.filter.Equals('bar'))))), ents) ## main ## if __name__ == '__main__': unittest.main() ## EOF ##