aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/lib/naming_policy.py
blob: ffef7d9c468683cf89fc7cab045bc2cf3b7c2b93 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

# standard imports
import abc
import os
import typing

# external imports
import urllib.parse

# bsie imports
from bsie.utils import bsfs, errors, ns
from bsie.utils.node import Node

# exports
__all__: typing.Sequence[str] = (
    'DefaultNamingPolicy',
    )


## code ##

class NamingPolicy():
    """Determine node uri's from node hints."""
    def __call__(
            self,
            iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]],
            ):
        """Apply the policy on a triple iterator."""
        return NamingPolicyIterator(self, iterable)

    @abc.abstractmethod
    def handle_node(self, node: Node) -> Node:
        """Apply the policy on a node."""


class NamingPolicyIterator():
    """Iterates over triples, determines uris according to a *policy* as it goes."""

    # source triple iterator.
    _iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]]

    # naming policy
    _policy: NamingPolicy

    def __init__(
            self,
            policy: NamingPolicy,
            iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]],
            ):
        self._iterable = iterable
        self._policy = policy

    def __iter__(self):
        for node, pred, value in self._iterable:
            # handle subject
            self._policy.handle_node(node)
            # handle value
            if isinstance(value, Node):
                self._policy.handle_node(value)
            # yield triple
            yield node, pred, value


class DefaultNamingPolicy(NamingPolicy):
    """Compose URIs as <host/user/node_type#fragment>

    What information is used as fragment depends on the node type.
    Typically, the default is to use the "ucid" hint.
    The fallback in all cases is to generate a random uuid.

    Never changes previously assigned uris. Sets uris in-place.

    """

    def __init__(
            self,
            host: bsfs.URI,
            user: str,
            ):
        self._prefix = bsfs.Namespace(os.path.join(host, user))
        self._uuid = bsfs.uuid.UUID()

    def handle_node(self, node: Node) -> Node:
        if node.uri is not None:
            return node
        if node.node_type == ns.bsn.Entity:
            return self.name_entity(node)
        if node.node_type == ns.bsn.Preview:
            return self.name_preview(node)
        if node.node_type == ns.bsn.Tag:
            return self.name_tag(node)
        raise errors.ProgrammingError(f'no naming policy available for {node.node_type}')

    def name_entity(self, node: Node) -> Node:
        """Set a bsn:Entity node's uri fragment to its ucid."""
        if 'ucid' in node.hints: # content id
            fragment = node.hints['ucid']
        else: # random name
            fragment = self._uuid()
        node.uri = getattr(self._prefix.file(), fragment)
        return node

    def name_preview(self, node: Node) -> Node:
        """Set a bsn:Preview node's uri fragment to its ucid.
        Uses its source fragment as fallback. Appends the size if provided.
        """
        fragment = None
        if 'ucid' in node.hints: # content id
            fragment = node.hints['ucid']
        if fragment is None and 'source' in node.hints: # source id
            self.handle_node(node.hints['source'])
            fragment = node.hints['source'].uri.get('fragment', None)
        if fragment is None: # random name
            fragment = self._uuid()
        if 'size' in node.hints: # append size
            fragment += '_s' + str(node.hints['size'])
        node.uri = getattr(self._prefix.preview(), fragment)
        return node

    def name_tag(self, node: Node) -> Node:
        # NOTE: Must ensure to produce the same name for that tags with the same label.
        if 'label' in node.hints: # tag label
            fragment = urllib.parse.quote(node.hints['label'])
        else: # random name
            fragment = self._uuid()
        # FIXME: match to existing tags in bsfs storage!
        node.uri = getattr(self._prefix.tag(), fragment)
        return node

## EOF ##