1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# standard imports
import abc
import os
import typing
# external imports
import urllib.parse
# bsie imports
from bsie.utils import bsfs, errors, ns
from bsie.utils.node import Node
# exports
__all__: typing.Sequence[str] = (
'DefaultNamingPolicy',
)
## code ##
class NamingPolicy():
"""Determine node uri's from node hints."""
def __call__(
self,
iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]],
):
"""Apply the policy on a triple iterator."""
return NamingPolicyIterator(self, iterable)
@abc.abstractmethod
def handle_node(self, node: Node) -> Node:
"""Apply the policy on a node."""
class NamingPolicyIterator():
"""Iterates over triples, determines uris according to a *policy* as it goes."""
# source triple iterator.
_iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]]
# naming policy
_policy: NamingPolicy
def __init__(
self,
policy: NamingPolicy,
iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]],
):
self._iterable = iterable
self._policy = policy
def __iter__(self):
for node, pred, value in self._iterable:
# handle subject
self._policy.handle_node(node)
# handle value
if isinstance(value, Node):
self._policy.handle_node(value)
# yield triple
yield node, pred, value
class DefaultNamingPolicy(NamingPolicy):
"""Compose URIs as <host/user/node_type#fragment>
What information is used as fragment depends on the node type.
Typically, the default is to use the "ucid" hint.
The fallback in all cases is to generate a random uuid.
Never changes previously assigned uris. Sets uris in-place.
"""
def __init__(
self,
host: bsfs.URI,
user: str,
):
self._prefix = bsfs.Namespace(os.path.join(host, user))
self._uuid = bsfs.uuid.UUID()
def handle_node(self, node: Node) -> Node:
if node.uri is not None:
return node
if node.node_type == ns.bsn.Entity:
return self.name_entity(node)
if node.node_type == ns.bsn.Preview:
return self.name_preview(node)
if node.node_type == ns.bsn.Tag:
return self.name_tag(node)
raise errors.ProgrammingError(f'no naming policy available for {node.node_type}')
def name_entity(self, node: Node) -> Node:
"""Set a bsn:Entity node's uri fragment to its ucid."""
if 'ucid' in node.hints: # content id
fragment = node.hints['ucid']
else: # random name
fragment = self._uuid()
node.uri = getattr(self._prefix.file(), fragment)
return node
def name_preview(self, node: Node) -> Node:
"""Set a bsn:Preview node's uri fragment to its ucid.
Uses its source fragment as fallback. Appends the size if provided.
"""
fragment = None
if 'ucid' in node.hints: # content id
fragment = node.hints['ucid']
if fragment is None and 'source' in node.hints: # source id
self.handle_node(node.hints['source'])
fragment = node.hints['source'].uri.get('fragment', None)
if fragment is None: # random name
fragment = self._uuid()
if 'size' in node.hints: # append size
fragment += '_s' + str(node.hints['size'])
node.uri = getattr(self._prefix.preview(), fragment)
return node
def name_tag(self, node: Node) -> Node:
# NOTE: Must ensure to produce the same name for that tags with the same label.
if 'label' in node.hints: # tag label
fragment = urllib.parse.quote(node.hints['label'])
else: # random name
fragment = self._uuid()
# FIXME: match to existing tags in bsfs storage!
node.uri = getattr(self._prefix.tag(), fragment)
return node
## EOF ##
|