aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/graph/graph.py
blob: 11fe83599c1aa43227823c0e8edd4064c8349f7e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""

Part of the BlackStar filesystem (bsfs) module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
import os
import typing

# bsfs imports
from bsfs.query import ast, validate
from bsfs import schema as bsc
from bsfs.triple_store import TripleStoreBase
from bsfs.utils import URI, typename

# inner-module imports
from . import ac
from . import nodes as _nodes
from . import resolve

# exports
__all__: typing.Sequence[str] = (
    'Graph',
    )


## code ##

class Graph():
    """The Graph class provides a convenient interface to query and access a graph.
    Since it logically builds on the concept of graphs it is easier to
    navigate than raw triple stores. Naturally, it uses a triple store
    as *backend*. It also controls actions via access permissions to a *user*.

    """

    # link to the triple storage backend.
    _backend: TripleStoreBase

    # access controls.
    _ac: ac.AccessControlBase

    # query resolver.
    _resolver: resolve.Filter

    # query validator.
    _validate: validate.Filter

    def __init__(
            self,
            backend: TripleStoreBase,
            access_control: ac.AccessControlBase,
            ):
        # store members
        self._backend = backend
        self._ac = access_control
        # helper classes
        self._resolver = resolve.Filter(self._backend.schema)
        self._validate = validate.Filter(self._backend.schema)
        # ensure Graph schema requirements
        self.migrate(self._backend.schema)

    def __hash__(self) -> int:
        return hash((type(self), self._backend, self._ac))

    def __eq__(self, other) -> bool:
        return isinstance(other, type(self)) \
           and self._backend == other._backend \
           and self._ac == other._ac

    def __repr__(self) -> str:
        return f'{typename(self)}({repr(self._backend)}, {self._ac})'

    def __str__(self) -> str:
        return f'{typename(self)}({str(self._backend)})'

    @property
    def schema(self) -> bsc.Schema:
        """Return the store's local schema."""
        return self._backend.schema

    def migrate(self, schema: bsc.Schema, append: bool = True) -> 'Graph':
        """Migrate the current schema to a new *schema*.

        Appends to the current schema by default; control this via *append*.
        The `Graph` may add additional classes to the schema that are required for its interals.

        """
        # check args
        if not isinstance(schema, bsc.Schema):
            raise TypeError(schema)
        # append to current schema
        if append:
            schema = schema + self._backend.schema
        # add Graph schema requirements
        with open(os.path.join(os.path.dirname(__file__), 'schema.nt'), mode='rt', encoding='UTF-8') as ifile:
            schema = schema + bsc.from_string(ifile.read())
        # migrate schema in backend
        # FIXME: consult access controls!
        self._backend.schema = schema
        # re-initialize members
        self._resolver.schema = self.schema
        self._validate.schema = self.schema
        # return self
        return self

    def nodes(self, node_type: URI, guids: typing.Iterable[URI]) -> _nodes.Nodes:
        """Return nodes *guids* of type *node_type* as a `bsfs.graph.Nodes` instance.

        Note that the *guids* need not to exist (however, the *node_type* has
        to be part of the schema). Inexistent guids will be created (using
        *node_type*) once some data is assigned to them.

        """
        # get node type
        type_ = self.schema.node(node_type)
        # NOTE: Nodes constructor materializes guids.
        return _nodes.Nodes(self._backend, self._ac, type_, guids)

    def node(self, node_type: URI, guid: URI) -> _nodes.Nodes:
        """Return node *guid* of type *node_type* as a `bsfs.graph.Nodes` instance.

        Note that the *guid* need not to exist (however, the *node_type* has
        to be part of the schema). An inexistent guid will be created (using
        *node_type*) once some data is assigned to them.

        """
        return self.nodes(node_type, {guid})

    def empty(self, node_type: URI) -> _nodes.Nodes:
        """Return a `Nodes` instance with type *node_type* but no nodes."""
        return self.nodes(node_type, set())

    def get(
            self,
            node_type: URI,
            query: typing.Optional[ast.filter.FilterExpression],
            ) -> _nodes.Nodes:
        """Return a `Nodes` instance over all nodes of type *node_type* that match the *query*."""
        # return Nodes instance
        type_ = self.schema.node(node_type)
        return _nodes.Nodes(self._backend, self._ac, type_, self.__get(node_type, query))

    def sorted(
            self,
            node_type: URI,
            query: typing.Optional[ast.filter.FilterExpression],
            # FIXME: sort ast
            ) -> typing.Iterator[_nodes.Nodes]:
        """Return a iterator over `Nodes` instances over all nodes of type *node_type* that match the *query*."""
        # FIXME: Order should be a parameter
        # return iterator over Nodes instances
        type_ = self.schema.node(node_type)
        for guid in self.__get(node_type, query):
            yield _nodes.Nodes(self._backend, self._ac, type_, {guid})

    def all(self, node_type: URI) -> _nodes.Nodes:
        """Return all instances of type *node_type*."""
        type_ = self.schema.node(node_type)
        return _nodes.Nodes(self._backend, self._ac, type_, self.__get(node_type, None))

    def __get(
            self,
            node_type: URI,
            query: typing.Optional[ast.filter.FilterExpression],
            ) -> typing.Iterator[URI]:
        """Build and execute a get query."""
        # get node type
        type_ = self.schema.node(node_type)
        # resolve Nodes instances
        query = self._resolver(type_, query)
        # add access controls to query
        query = self._ac.filter_read(type_, query)
        # validate query
        if query is not None:
            self._validate(type_, query)
        # query the backend and return the (non-materialized) result
        return self._backend.get(type_, query)

## EOF ##