aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/schema
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/schema')
-rw-r--r--bsfs/schema/__init__.py14
-rw-r--r--bsfs/schema/schema.py125
-rw-r--r--bsfs/schema/serialize.py255
-rw-r--r--bsfs/schema/types.py207
4 files changed, 463 insertions, 138 deletions
diff --git a/bsfs/schema/__init__.py b/bsfs/schema/__init__.py
index ad4d456..ca2e0cd 100644
--- a/bsfs/schema/__init__.py
+++ b/bsfs/schema/__init__.py
@@ -1,15 +1,15 @@
-"""
-Part of the BlackStar filesystem (bsfs) module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
# imports
import typing
# inner-module imports
from .schema import Schema
-from .types import Literal, Node, Predicate
+from .serialize import from_string, to_string
+from .types import Literal, Node, Predicate, Vertex, Feature, \
+ ROOT_VERTEX, ROOT_NODE, ROOT_LITERAL, \
+ ROOT_NUMBER, ROOT_TIME, \
+ ROOT_ARRAY, ROOT_FEATURE, \
+ ROOT_PREDICATE
# exports
__all__: typing.Sequence[str] = (
@@ -17,6 +17,8 @@ __all__: typing.Sequence[str] = (
'Node',
'Predicate',
'Schema',
+ 'from_string',
+ 'to_string',
)
## EOF ##
diff --git a/bsfs/schema/schema.py b/bsfs/schema/schema.py
index c5d4571..c104436 100644
--- a/bsfs/schema/schema.py
+++ b/bsfs/schema/schema.py
@@ -1,16 +1,9 @@
-"""
-Part of the BlackStar filesystem (bsfs) module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
# imports
from collections import abc, namedtuple
import typing
-import rdflib
# bsfs imports
-from bsfs.namespace import ns
from bsfs.utils import errors, URI, typename
# inner-module imports
@@ -51,11 +44,13 @@ class Schema():
def __init__(
self,
- predicates: typing.Iterable[types.Predicate],
+ predicates: typing.Optional[typing.Iterable[types.Predicate]] = None,
nodes: typing.Optional[typing.Iterable[types.Node]] = None,
literals: typing.Optional[typing.Iterable[types.Literal]] = None,
):
# materialize arguments
+ if predicates is None:
+ predicates = set()
if nodes is None:
nodes = set()
if literals is None:
@@ -63,24 +58,41 @@ class Schema():
nodes = set(nodes)
literals = set(literals)
predicates = set(predicates)
+
+ # add root types to the schema
+ nodes.add(types.ROOT_NODE)
+ literals.add(types.ROOT_LITERAL)
+ predicates.add(types.ROOT_PREDICATE)
+ # add minimally necessary types to the schema
+ literals.add(types.ROOT_BLOB)
+ literals.add(types.ROOT_NUMBER)
+ literals.add(types.ROOT_TIME)
+ literals.add(types.ROOT_ARRAY)
+ literals.add(types.ROOT_FEATURE)
+
+ # FIXME: ensure that types derive from the right root?
+
# include parents in predicates set
# TODO: review type annotations and ignores for python >= 3.11 (parents is _Type but should be typing.Self)
predicates |= {par for pred in predicates for par in pred.parents()} # type: ignore [misc]
# include predicate domain in nodes set
nodes |= {pred.domain for pred in predicates}
# include predicate range in nodes and literals sets
- prange = {pred.range for pred in predicates if pred.range is not None}
+ prange = {pred.range for pred in predicates}
nodes |= {vert for vert in prange if isinstance(vert, types.Node)}
literals |= {vert for vert in prange if isinstance(vert, types.Literal)}
+ # NOTE: ROOT_PREDICATE has a Vertex as range which is neither in nodes nor literals
+ # FIXME: with the ROOT_VERTEX missing, the schema is not complete anymore!
+
# include parents in nodes and literals sets
- # NOTE: Must be done after predicate domain/range was handled
- # so that their parents are included as well.
+ # NOTE: Must come after predicate domain/range was handled to have their parents as well.
nodes |= {par for node in nodes for par in node.parents()} # type: ignore [misc]
literals |= {par for lit in literals for par in lit.parents()} # type: ignore [misc]
# assign members
self._nodes = {node.uri: node for node in nodes}
self._literals = {lit.uri: lit for lit in literals}
self._predicates = {pred.uri: pred for pred in predicates}
+
# verify unique uris
if len(nodes) != len(self._nodes):
raise errors.ConsistencyError('inconsistent nodes')
@@ -214,6 +226,7 @@ class Schema():
>>> Schema.Union([a, b, c])
"""
+ # FIXME: copy type annotations?
if len(args) == 0:
raise TypeError('Schema.Union requires at least one argument (Schema or Iterable)')
if isinstance(args[0], cls): # args is sequence of Schema instances
@@ -295,92 +308,8 @@ class Schema():
"""Return the Literal matching the *uri*."""
return self._literals[uri]
-
- ## constructors ##
-
-
- @classmethod
- def Empty(cls) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod
- """Return a minimal Schema."""
- node = types.Node(ns.bsfs.Node, None)
- literal = types.Literal(ns.bsfs.Literal, None)
- predicate = types.Predicate(
- uri=ns.bsfs.Predicate,
- parent=None,
- domain=node,
- range=None,
- unique=False,
- )
- return cls((predicate, ), (node, ), (literal, ))
-
-
- @classmethod
- def from_string(cls, schema: str) -> 'Schema': # pylint: disable=invalid-name # capitalized classmethod
- """Load and return a Schema from a string."""
- # parse string into rdf graph
- graph = rdflib.Graph()
- graph.parse(data=schema, format='turtle')
-
- def _fetch_hierarchically(factory, curr):
- # emit current node
- yield curr
- # walk through childs
- for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
- # convert to URI
- child = URI(child)
- # check circular dependency
- if child == curr.uri or child in {node.uri for node in curr.parents()}:
- raise errors.ConsistencyError('circular dependency')
- # recurse and emit (sub*)childs
- yield from _fetch_hierarchically(factory, factory(child, curr))
-
- # fetch nodes
- nodes = set(_fetch_hierarchically(types.Node, types.Node(ns.bsfs.Node, None)))
- nodes_lut = {node.uri: node for node in nodes}
- if len(nodes_lut) != len(nodes):
- raise errors.ConsistencyError('inconsistent nodes')
-
- # fetch literals
- literals = set(_fetch_hierarchically(types.Literal, types.Literal(ns.bsfs.Literal, None)))
- literals_lut = {lit.uri: lit for lit in literals}
- if len(literals_lut) != len(literals):
- raise errors.ConsistencyError('inconsistent literals')
-
- # fetch predicates
- def build_predicate(uri, parent):
- uri = rdflib.URIRef(uri)
- # get domain
- domains = set(graph.objects(uri, rdflib.RDFS.domain))
- if len(domains) != 1:
- raise errors.ConsistencyError(f'inconsistent domain: {domains}')
- dom = nodes_lut.get(next(iter(domains)))
- if dom is None:
- raise errors.ConsistencyError('missing domain')
- # get range
- ranges = set(graph.objects(uri, rdflib.RDFS.range))
- if len(ranges) != 1:
- raise errors.ConsistencyError(f'inconsistent range: {ranges}')
- rng = next(iter(ranges))
- rng = nodes_lut.get(rng, literals_lut.get(rng))
- if rng is None:
- raise errors.ConsistencyError('missing range')
- # get unique flag
- uniques = set(graph.objects(uri, rdflib.URIRef(ns.bsfs.unique)))
- if len(uniques) != 1:
- raise errors.ConsistencyError(f'inconsistent unique flags: {uniques}')
- unique = bool(next(iter(uniques)))
- # build Predicate
- return types.Predicate(URI(uri), parent, dom, rng, unique)
-
- root_predicate = types.Predicate(
- uri=ns.bsfs.Predicate,
- parent=None,
- domain=nodes_lut[ns.bsfs.Node],
- range=None, # FIXME: Unclear how to handle this! Can be either a Literal or a Node
- unique=False,
- )
- predicates = _fetch_hierarchically(build_predicate, root_predicate)
- # return Schema
- return cls(predicates, nodes, literals)
+ def predicates_at(self, node: types.Node) -> typing.Iterator[types.Predicate]:
+ """Return predicates that have domain *node* (or superclass thereof)."""
+ return iter(pred for pred in self._predicates.values() if node <= pred.domain)
## EOF ##
diff --git a/bsfs/schema/serialize.py b/bsfs/schema/serialize.py
new file mode 100644
index 0000000..ea8b2f4
--- /dev/null
+++ b/bsfs/schema/serialize.py
@@ -0,0 +1,255 @@
+
+# standard imports
+import itertools
+import typing
+
+# external imports
+import rdflib
+
+# bsfs imports
+from bsfs.namespace import ns
+from bsfs.utils import errors, URI, typename
+
+# inner-module imports
+from . import types
+from . import schema
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'to_string',
+ 'from_string',
+ )
+
+
+## code ##
+
+def from_string(schema_str: str) -> schema.Schema:
+ """Load and return a Schema from a string."""
+ # parse string into rdf graph
+ graph = rdflib.Graph()
+ graph.parse(data=schema_str, format='turtle')
+
+ # helper functions
+ def _fetch_value(
+ subject: URI,
+ predicate: rdflib.URIRef,
+ value_factory: typing.Callable[[typing.Any], typing.Any],
+ ) -> typing.Optional[typing.Any]:
+ """Fetch the object of a given subject and predicate.
+ Raises a `errors.ConsistencyError` if multiple objects match.
+ """
+ values = list(graph.objects(rdflib.URIRef(subject), predicate))
+ if len(values) == 0:
+ return None
+ if len(values) == 1:
+ return value_factory(values[0])
+ raise errors.ConsistencyError(
+ f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')
+
+ def _convert(value):
+ """Convert the subject type from rdflib to a bsfs native type."""
+ if isinstance(value, rdflib.Literal):
+ return value.value
+ if isinstance(value, rdflib.URIRef):
+ return URI(value)
+ # value is neither a node nor a literal, but e.g. a blank node
+ raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}')
+
+ def _fetch_hierarchically(factory, curr):
+ """Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
+ # emit current node
+ yield curr
+ # walk through childs
+ for child in graph.subjects(rdflib.URIRef(ns.rdfs.subClassOf), rdflib.URIRef(curr.uri)):
+ # fetch annotations
+ annotations = {
+ URI(pred): _convert(value)
+ for pred, value # FIXME: preserve datatype of value?!
+ in graph.predicate_objects(child)
+ if URI(pred) != ns.rdfs.subClassOf
+ }
+ # convert child to URI
+ child = URI(child)
+ # check circular dependency
+ if child == curr.uri or child in {node.uri for node in curr.parents()}:
+ raise errors.ConsistencyError('circular dependency')
+ # recurse and emit (sub*)childs
+ yield from _fetch_hierarchically(factory, factory(child, curr, **annotations))
+
+ # fetch nodes
+ nodes = set(_fetch_hierarchically(types.Node, types.ROOT_NODE))
+ nodes_lut = {node.uri: node for node in nodes}
+ if len(nodes_lut) != len(nodes):
+ raise errors.ConsistencyError('inconsistent nodes')
+
+ # fetch literals
+ def _build_literal(uri, parent, **annotations):
+ """Literal factory."""
+ # break out on root feature type
+ if uri == types.ROOT_FEATURE.uri:
+ return types.ROOT_FEATURE
+ # handle feature types
+ if isinstance(parent, types.Feature):
+ # clean annotations
+ annotations.pop(ns.bsfs.dimension, None)
+ annotations.pop(ns.bsfs.dtype, None)
+ annotations.pop(ns.bsfs.distance, None)
+ # get dimension
+ dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
+ # get dtype
+ dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
+ # get distance
+ distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
+ # return feature
+ return parent.child(URI(uri), dtype=dtype, dimension=dimension, distance=distance, **annotations)
+ # handle non-feature types
+ return parent.child(URI(uri), **annotations)
+
+ literals = set(_fetch_hierarchically(_build_literal, types.ROOT_LITERAL))
+ literals_lut = {lit.uri: lit for lit in literals}
+ if len(literals_lut) != len(literals):
+ raise errors.ConsistencyError('inconsistent literals')
+
+ # fetch predicates
+ def _build_predicate(uri, parent, **annotations):
+ """Predicate factory."""
+ # clean annotations
+ annotations.pop(ns.rdfs.domain, None)
+ annotations.pop(ns.rdfs.range, None)
+ annotations.pop(ns.bsfs.unique, None)
+ # get domain
+ dom = _fetch_value(uri, rdflib.RDFS.domain, URI)
+ if dom is not None and dom not in nodes_lut:
+ raise errors.ConsistencyError(f'predicate {uri} has undefined domain {dom}')
+ if dom is not None:
+ dom = nodes_lut[dom]
+ # get range
+ rng = _fetch_value(uri, rdflib.RDFS.range, URI)
+ if rng is not None and rng not in nodes_lut and rng not in literals_lut:
+ raise errors.ConsistencyError(f'predicate {uri} has undefined range {rng}')
+ if rng is not None:
+ rng = nodes_lut.get(rng, literals_lut.get(rng))
+ # get unique
+ unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
+ # build predicate
+ return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
+
+ predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)
+
+ return schema.Schema(predicates, nodes, literals)
+
+
+
+def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str:
+ """Serialize a `bsfs.schema.Schema` to a string.
+ See `rdflib.Graph.serialize` for viable formats (default: turtle).
+ """
+
+ # type of emitted triples.
+ T_TRIPLE = typing.Iterator[typing.Tuple[rdflib.URIRef, rdflib.URIRef, rdflib.term.Identifier]]
+
+ def _type(tpe: types._Type) -> T_TRIPLE :
+ """Emit _Type properties (parent, annotations)."""
+ # emit parent
+ if tpe.parent is not None:
+ yield (
+ rdflib.URIRef(tpe.uri),
+ rdflib.URIRef(ns.rdfs.subClassOf),
+ rdflib.URIRef(tpe.parent.uri),
+ )
+ # emit annotations
+ for prop, value in tpe.annotations.items():
+ yield (
+ rdflib.URIRef(tpe.uri),
+ rdflib.URIRef(prop),
+ rdflib.Literal(value), # FIXME: datatype?!
+ )
+
+ def _predicate(pred: types.Predicate) -> T_TRIPLE:
+ """Emit Predicate properties (domain, range, unique)."""
+ # no need to emit anything for the root predicate
+ if pred == types.ROOT_PREDICATE:
+ return
+ # emit domain
+ if pred.domain != getattr(pred.parent, 'domain', None):
+ yield (
+ rdflib.URIRef(pred.uri),
+ rdflib.URIRef(ns.rdfs.domain),
+ rdflib.URIRef(pred.domain.uri),
+ )
+ # emit range
+ if pred.range != getattr(pred.parent, 'range', None):
+ yield (
+ rdflib.URIRef(pred.uri),
+ rdflib.URIRef(ns.rdfs.range),
+ rdflib.URIRef(pred.range.uri),
+ )
+ # emit cardinality
+ if pred.unique != getattr(pred.parent, 'unique', None):
+ yield (
+ rdflib.URIRef(pred.uri),
+ rdflib.URIRef(ns.bsfs.unique),
+ rdflib.Literal(pred.unique, datatype=rdflib.XSD.boolean),
+ )
+
+ def _feature(feat: types.Feature) -> T_TRIPLE:
+ """Emit Feature properties (dimension, dtype, distance)."""
+ # emit size
+ if feat.dimension != getattr(feat.parent, 'dimension', None):
+ yield (
+ rdflib.URIRef(feat.uri),
+ rdflib.URIRef(ns.bsfs.dimension),
+ rdflib.Literal(feat.dimension, datatype=rdflib.XSD.integer),
+ )
+ # emit dtype
+ if feat.dtype != getattr(feat.parent, 'dtype', None):
+ yield (
+ rdflib.URIRef(feat.uri),
+ rdflib.URIRef(ns.bsfs.dtype),
+ rdflib.URIRef(feat.dtype),
+ )
+ # emit distance
+ if feat.distance != getattr(feat.parent, 'distance', None):
+ yield (
+ rdflib.URIRef(feat.uri),
+ rdflib.URIRef(ns.bsfs.distance),
+ rdflib.URIRef(feat.distance),
+ )
+
+ def _parse(node: types._Type) -> T_TRIPLE:
+ """Emit all properties of a type."""
+ # check arg
+ if not isinstance(node, types._Type): # pylint: disable=protected-access
+ raise TypeError(node)
+ # emit _Type essentials
+ yield from _type(node)
+ # emit properties of derived types
+ if isinstance(node, types.Predicate):
+ yield from _predicate(node)
+ if isinstance(node, types.Feature):
+ yield from _feature(node)
+
+ # create graph
+ graph = rdflib.Graph()
+ # add triples to graph
+ nodes = itertools.chain(
+ schema_inst.nodes(),
+ schema_inst.literals(),
+ schema_inst.predicates())
+ for node in nodes:
+ for triple in _parse(node):
+ graph.add(triple)
+ # add known namespaces for readability
+ # FIXME: more generically?
+ graph.bind('bsfs', rdflib.URIRef(ns.bsfs + '/'))
+ graph.bind('bsl', rdflib.URIRef(ns.bsl + '/'))
+ graph.bind('bsn', rdflib.URIRef(ns.bsn + '#'))
+ graph.bind('bse', rdflib.URIRef(ns.bsfs.Entity() + '#'))
+ graph.bind('rdf', rdflib.URIRef(ns.rdf))
+ graph.bind('rdfs', rdflib.URIRef(ns.rdfs))
+ graph.bind('schema', rdflib.URIRef(ns.schema))
+ graph.bind('xsd', rdflib.URIRef(ns.xsd))
+ # serialize to turtle
+ return graph.serialize(format=fmt)
+
+## EOF ##
diff --git a/bsfs/schema/types.py b/bsfs/schema/types.py
index 54a7e99..5834df8 100644
--- a/bsfs/schema/types.py
+++ b/bsfs/schema/types.py
@@ -1,13 +1,9 @@
-"""
-Part of the BlackStar filesystem (bsfs) module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
# imports
import typing
# bsfs imports
+from bsfs.namespace import ns
from bsfs.utils import errors, URI, typename
# exports
@@ -15,6 +11,7 @@ __all__: typing.Sequence[str] = (
'Literal',
'Node',
'Predicate',
+ 'Feature',
)
@@ -99,9 +96,11 @@ class _Type():
self,
uri: URI,
parent: typing.Optional['_Type'] = None,
+ **annotations: typing.Any,
):
- self.uri = uri
+ self.uri = URI(uri)
self.parent = parent
+ self.annotations = annotations
def parents(self) -> typing.Generator['_Type', None, None]:
"""Generate a list of parent nodes."""
@@ -110,9 +109,17 @@ class _Type():
yield curr
curr = curr.parent
- def get_child(self, uri: URI, **kwargs):
+ def child(
+ self,
+ uri: URI,
+ **kwargs,
+ ):
"""Return a child of the current class."""
- return type(self)(uri, self, **kwargs)
+ return type(self)(
+ uri=uri,
+ parent=self,
+ **kwargs
+ )
def __str__(self) -> str:
return f'{typename(self)}({self.uri})'
@@ -138,8 +145,10 @@ class _Type():
def __lt__(self, other: typing.Any) -> bool:
"""Return True iff *self* is a true subclass of *other*."""
- if not type(self) is type(other): # type mismatch # pylint: disable=unidiomatic-typecheck
+ if not isinstance(other, _Type):
return NotImplemented
+ if not isinstance(other, type(self)): # FIXME: necessary?
+ return False
if self.uri == other.uri: # equivalence
return False
if self in other.parents(): # superclass
@@ -151,8 +160,10 @@ class _Type():
def __le__(self, other: typing.Any) -> bool:
"""Return True iff *self* is equivalent or a subclass of *other*."""
- if not type(self) is type(other): # type mismatch # pylint: disable=unidiomatic-typecheck
+ if not isinstance(other, _Type):
return NotImplemented
+ if not isinstance(other, type(self)): # FIXME: necessary?
+ return False
if self.uri == other.uri: # equivalence
return True
if self in other.parents(): # superclass
@@ -164,8 +175,10 @@ class _Type():
def __gt__(self, other: typing.Any) -> bool:
"""Return True iff *self* is a true superclass of *other*."""
- if not type(self) is type(other): # type mismatch # pylint: disable=unidiomatic-typecheck
+ if not isinstance(other, _Type):
return NotImplemented
+ if not isinstance(other, type(self)): # FIXME: necessary?
+ return False
if self.uri == other.uri: # equivalence
return False
if self in other.parents(): # superclass
@@ -177,8 +190,10 @@ class _Type():
def __ge__(self, other: typing.Any) -> bool:
"""Return True iff *self* is eqiuvalent or a superclass of *other*."""
- if not type(self) is type(other): # type mismatch # pylint: disable=unidiomatic-typecheck
+ if not isinstance(other, _Type):
return NotImplemented
+ if not isinstance(other, type(self)): # FIXME: necessary?
+ return False
if self.uri == other.uri: # equivalence
return True
if self in other.parents(): # superclass
@@ -189,32 +204,95 @@ class _Type():
return False
-class _Vertex(_Type):
+class Vertex(_Type):
"""Graph vertex types. Can be a Node or a Literal."""
- def __init__(self, uri: URI, parent: typing.Optional['_Vertex']):
- super().__init__(uri, parent)
+ parent: typing.Optional['Vertex']
+ def __init__(self, uri: URI, parent: typing.Optional['Vertex'], **kwargs):
+ super().__init__(uri, parent, **kwargs)
-class Node(_Vertex):
+class Node(Vertex):
"""Node type."""
- def __init__(self, uri: URI, parent: typing.Optional['Node']):
- super().__init__(uri, parent)
+ parent: typing.Optional['Node']
+ def __init__(self, uri: URI, parent: typing.Optional['Node'], **kwargs):
+ super().__init__(uri, parent, **kwargs)
-class Literal(_Vertex):
+class Literal(Vertex):
"""Literal type."""
- def __init__(self, uri: URI, parent: typing.Optional['Literal']):
- super().__init__(uri, parent)
+ parent: typing.Optional['Literal']
+ def __init__(self, uri: URI, parent: typing.Optional['Literal'], **kwargs):
+ super().__init__(uri, parent, **kwargs)
+
+
+class Feature(Literal):
+ """Feature type."""
+
+ # Number of feature vector dimensions.
+ dimension: int
+
+ # Feature vector datatype.
+ dtype: URI
+
+ # Distance measure to compare feature vectors.
+ distance: URI
+
+ def __init__(
+ self,
+ # Type members
+ uri: URI,
+ parent: typing.Optional[Literal],
+ # Feature members
+ dimension: int,
+ dtype: URI,
+ distance: URI,
+ **kwargs,
+ ):
+ super().__init__(uri, parent, **kwargs)
+ self.dimension = int(dimension)
+ self.dtype = URI(dtype)
+ self.distance = URI(distance)
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(), self.dimension, self.dtype, self.distance))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self.dimension == other.dimension \
+ and self.dtype == other.dtype \
+ and self.distance == other.distance
+ def child(
+ self,
+ uri: URI,
+ dimension: typing.Optional[int] = None,
+ dtype: typing.Optional[URI] = None,
+ distance: typing.Optional[URI] = None,
+ **kwargs,
+ ):
+ """Return a child of the current class."""
+ if dimension is None:
+ dimension = self.dimension
+ if dtype is None:
+ dtype = self.dtype
+ if distance is None:
+ distance = self.distance
+ return super().child(
+ uri=uri,
+ dimension=dimension,
+ dtype=dtype,
+ distance=distance,
+ **kwargs,
+ )
class Predicate(_Type):
- """Predicate type."""
+ """Predicate base type."""
# source type.
domain: Node
# destination type.
- range: typing.Optional[typing.Union[Node, Literal]]
+ range: Vertex
# maximum cardinality of type.
unique: bool
@@ -226,22 +304,23 @@ class Predicate(_Type):
parent: typing.Optional['Predicate'],
# Predicate members
domain: Node,
- range: typing.Optional[typing.Union[Node, Literal]], # pylint: disable=redefined-builtin
+ range: Vertex, # pylint: disable=redefined-builtin
unique: bool,
+ **kwargs,
):
# check arguments
if not isinstance(domain, Node):
raise TypeError(domain)
- if range is not None and not isinstance(range, Node) and not isinstance(range, Literal):
+ if range != ROOT_VERTEX and not isinstance(range, (Node, Literal)):
raise TypeError(range)
# initialize
- super().__init__(uri, parent)
+ super().__init__(uri, parent, **kwargs)
self.domain = domain
self.range = range
- self.unique = unique
+ self.unique = bool(unique)
def __hash__(self) -> int:
- return hash((super().__hash__(), self.domain, self.range, self.unique))
+ return hash((super().__hash__(), self.domain, self.unique, self.range))
def __eq__(self, other: typing.Any) -> bool:
return super().__eq__(other) \
@@ -249,11 +328,11 @@ class Predicate(_Type):
and self.range == other.range \
and self.unique == other.unique
- def get_child(
+ def child(
self,
uri: URI,
domain: typing.Optional[Node] = None,
- range: typing.Optional[_Vertex] = None, # pylint: disable=redefined-builtin
+ range: typing.Optional[Vertex] = None, # pylint: disable=redefined-builtin
unique: typing.Optional[bool] = None,
**kwargs,
):
@@ -264,13 +343,73 @@ class Predicate(_Type):
raise errors.ConsistencyError(f'{domain} must be a subclass of {self.domain}')
if range is None:
range = self.range
- if range is None: # inherited range from ns.bsfs.Predicate
- raise ValueError('range must be defined by the parent or argument')
- if self.range is not None and not range <= self.range:
+ # NOTE: The root predicate has a Vertex as range, which is neither a parent of the root
+ # Node nor Literal. Hence, that test is skipped since a child should be allowed to
+ # specialize from Vertex to anything.
+ if self.range != ROOT_VERTEX and not range <= self.range:
raise errors.ConsistencyError(f'{range} must be a subclass of {self.range}')
if unique is None:
unique = self.unique
- return super().get_child(uri, domain=domain, range=range, unique=unique, **kwargs)
+ return super().child(
+ uri=uri,
+ domain=domain,
+ range=range,
+ unique=unique,
+ **kwargs
+ )
+
+
+# essential vertices
+ROOT_VERTEX = Vertex(
+ uri=ns.bsfs.Vertex,
+ parent=None,
+ )
+
+ROOT_NODE = Node(
+ uri=ns.bsfs.Node,
+ parent=None,
+ )
+
+ROOT_LITERAL = Literal(
+ uri=ns.bsfs.Literal,
+ parent=None,
+ )
+
+ROOT_BLOB = Literal(
+ uri=ns.bsl.BinaryBlob,
+ parent=ROOT_LITERAL,
+ )
+
+ROOT_NUMBER = Literal(
+ uri=ns.bsl.Number,
+ parent=ROOT_LITERAL,
+ )
+
+ROOT_TIME = Literal(
+ uri=ns.bsl.Time,
+ parent=ROOT_LITERAL,
+ )
+
+ROOT_ARRAY = Literal(
+ uri=ns.bsl.Array,
+ parent=ROOT_LITERAL,
+ )
+ROOT_FEATURE = Feature(
+ uri=ns.bsl.Array.Feature,
+ parent=ROOT_ARRAY,
+ dimension=1,
+ dtype=ns.bsfs.dtype().f16,
+ distance=ns.bsd.euclidean,
+ )
+
+# essential predicates
+ROOT_PREDICATE = Predicate(
+ uri=ns.bsfs.Predicate,
+ parent=None,
+ domain=ROOT_NODE,
+ range=ROOT_VERTEX,
+ unique=False,
+ )
## EOF ##