aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/schema/serialize.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/schema/serialize.py')
-rw-r--r--bsfs/schema/serialize.py83
1 files changed, 46 insertions, 37 deletions
diff --git a/bsfs/schema/serialize.py b/bsfs/schema/serialize.py
index a566d65..8b31737 100644
--- a/bsfs/schema/serialize.py
+++ b/bsfs/schema/serialize.py
@@ -35,13 +35,27 @@ def from_string(schema_str: str) -> schema.Schema:
graph.parse(data=schema_str, format='turtle')
# helper functions
+ # FIXME: type annotation
+ def _fetch_value(subject: URI, predicate: rdflib.URIRef, value_factory) -> typing.Optional[typing.Any]:
+ """Fetch the object of a given subject and predicate.
+ Raises a `errors.ConsistencyError` if multiple objects match.
+ """
+ values = list(graph.objects(rdflib.URIRef(subject), predicate))
+ if len(values) == 0:
+ return None
+ if len(values) == 1:
+ return value_factory(values[0])
+ raise errors.ConsistencyError(
+ f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')
+
def _convert(value):
"""Convert the subject type from rdflib to a bsfs native type."""
if isinstance(value, rdflib.Literal):
return value.value
if isinstance(value, rdflib.URIRef):
return URI(value)
- raise errors.UnreachableError(f'expected Literal or URIRef, found {typename(value)}')
+ # value is neither a node nor a literal, but e.g. a blank node
+ raise errors.BackendError(f'expected Literal or URIRef, found {typename(value)}')
def _fetch_hierarchically(factory, curr):
"""Walk through a rdfs:subClassOf hierarchy, creating symbols along the way."""
@@ -71,30 +85,36 @@ def from_string(schema_str: str) -> schema.Schema:
raise errors.ConsistencyError('inconsistent nodes')
# fetch literals
- literals = set(_fetch_hierarchically(types.Literal, types.ROOT_LITERAL))
+ def _build_literal(uri, parent, **annotations):
+ """Literal factory."""
+ # break out on root feature type
+ if uri == types.ROOT_FEATURE.uri:
+ return types.ROOT_FEATURE
+ # handle feature types
+ if isinstance(parent, types.Feature):
+ # clean annotations
+ annotations.pop(ns.bsfs.dimension, None)
+ annotations.pop(ns.bsfs.dtype, None)
+ annotations.pop(ns.bsfs.distance, None)
+ # get dimension
+ dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
+ # get dtype
+ dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
+ # get distance
+ distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
+ # return feature
+ return parent.child(URI(uri), dtype=dtype, dimension=dimension, distance=distance, **annotations)
+ # handle non-feature types
+ return parent.child(URI(uri), **annotations)
+
+ literals = set(_fetch_hierarchically(_build_literal, types.ROOT_LITERAL))
literals_lut = {lit.uri: lit for lit in literals}
if len(literals_lut) != len(literals):
raise errors.ConsistencyError('inconsistent literals')
# fetch predicates
- # FIXME: type annotation
- def _fetch_value(subject: URI, predicate: rdflib.URIRef, value_factory) -> typing.Optional[typing.Any]:
- """Fetch the object of a given subject and predicate.
- Raises a `errors.ConsistencyError` if multiple objects match.
- """
- values = list(graph.objects(rdflib.URIRef(subject), predicate))
- if len(values) == 0:
- return None
- if len(values) == 1:
- return value_factory(values[0])
- raise errors.ConsistencyError(
- f'{subject} has multiple values for predicate {str(predicate)}, expected zero or one')
-
def _build_predicate(uri, parent, **annotations):
"""Predicate factory."""
- # break out on root feature type
- if uri == types.ROOT_FEATURE.uri:
- return types.ROOT_FEATURE
# clean annotations
annotations.pop(ns.rdfs.domain, None)
annotations.pop(ns.rdfs.range, None)
@@ -113,23 +133,9 @@ def from_string(schema_str: str) -> schema.Schema:
rng = nodes_lut.get(rng, literals_lut.get(rng))
# get unique
unique = _fetch_value(uri, rdflib.URIRef(ns.bsfs.unique), bool)
- # handle feature types
- if isinstance(parent, types.Feature):
- # clean annotations
- annotations.pop(ns.bsfs.dimension, None)
- annotations.pop(ns.bsfs.dtype, None)
- annotations.pop(ns.bsfs.distance, None)
- # get dimension
- dimension = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dimension), int)
- # get dtype
- dtype = _fetch_value(uri, rdflib.URIRef(ns.bsfs.dtype), URI)
- # get distance
- distance = _fetch_value(uri, rdflib.URIRef(ns.bsfs.distance), URI)
- # return feature
- return parent.child(URI(uri), domain=dom, range=rng, unique=unique,
- dtype=dtype, dimension=dimension, distance=distance, **annotations)
- # handle non-feature predicate
+ # build predicate
return parent.child(URI(uri), domain=dom, range=rng, unique=unique, **annotations)
+
predicates = _fetch_hierarchically(_build_predicate, types.ROOT_PREDICATE)
return schema.Schema(predicates, nodes, literals)
@@ -214,9 +220,12 @@ def to_string(schema_inst: schema.Schema, fmt: str = 'turtle') -> str:
def _parse(node: types._Type) -> T_TRIPLE:
"""Emit all properties of a type."""
- if isinstance(node, types._Type): # pylint: disable=protected-access
- # NOTE: all nodes are _Type
- yield from _type(node)
+ # check arg
+ if not isinstance(node, types._Type): # pylint: disable=protected-access
+ raise TypeError(node)
+ # emit _Type essentials
+ yield from _type(node)
+ # emit properties of derived types
if isinstance(node, types.Predicate):
yield from _predicate(node)
if isinstance(node, types.Feature):