aboutsummaryrefslogtreecommitdiffstats
path: root/tagit/widgets/loader.py
blob: 9c0ffafb4dc31df42f3bc1a773b0c0d4a4a058da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""

Part of the tagit module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
import time
import typing

# kivy imports
from kivy.cache import Cache
from kivy.clock import Clock
from kivy.compat import queue
from kivy.loader import _Worker, LoaderThreadPool, ProxyImage, LoaderBase

# exports
__all__: typing.Sequence[str] = (
    'Loader',
    )


## code ##

class _ThreadPool(object):
    """Pool of threads consuming tasks from a queue.
    Identical to kivy.loader._ThreadPool except for the queue type."""
    def __init__(self, num_threads):
        super(_ThreadPool, self).__init__()
        self.running = True
        self.tasks = queue.LifoQueue() # mb: replace Queue with LifoQueue
        for _ in range(num_threads):
            _Worker(self, self.tasks)

    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    def stop(self):
        self.running = False
        self.tasks.join()


class TagitImageLoader(LoaderThreadPool):
    """Threaded Loader that prioritises recentness.
    This is useful if a user skips through browser pages because then the preview loading
    finishes only after the user has already switched to the next page. Instead of waiting
    until all images up to the target page were loaded, prioritsation makes more recent
    images to load first.

    Mostly copied from kivy.loader.Loader.
    """
    def start(self):
        LoaderBase.start(self) # mb: skip LoaderThreadPool.start
        self.pool = _ThreadPool(self._num_workers)
        Clock.schedule_interval(self.run, 0)

    def image(self, filename, load_callback=None, post_callback=None,
              **kwargs):
        data = Cache.get('kv.loader', filename)
        if data not in (None, False):
            # found image, if data is not here, need to reload.
            return ProxyImage(data,
                              loading_image=self.loading_image,
                              loaded=True, **kwargs)

        client = ProxyImage(self.loading_image,
                            loading_image=self.loading_image, **kwargs)
        self._client.append((filename, client))

        if data is None:
            # if data is None, this is really the first time
            self._q_load.appendleft({
                'filename': filename,
                'load_callback': load_callback,
                'post_callback': post_callback,
                'request_time': Clock.get_time(), # mb: also pass time of original request
                'kwargs': kwargs})
            if not kwargs.get('nocache', False):
                Cache.append('kv.loader', filename, False)
            self._start_wanted = True
            self._trigger_update()
        else:
            # already queued for loading
            pass

        return client

    def _clear(self):
        if self.pool is not None:
            tbr = set()

            # clear loader queue
            while len(self._q_load):
                kargs = self._q_load.pop()
                tbr.add(kargs['filename'])

            # clear task queue
            while not self.pool.tasks.empty():
                func, args, kargs = self.pool.tasks.get()
                if len(args) and 'filename' in args[0]:
                    tbr.add(args[0]['filename'])
                self.pool.tasks.task_done()

            # remove spurious entries from cache
            for key in tbr:
                # remove directly from Cache if _clear is run from the main thread
                Cache.remove('kv.loader', key)
                # otherwise go via _q_done
                #self._q_done.appendleft(key, None, 0))

            # remove spurious clients
            for key in ((name, client) for name, client in self._client if name in tbr):
                self._client.remove(key)

    def clear(self):
        """Empty the queue without loading the images."""
        # execute in main thread
        self._clear()
        # schedule as event (no real benefit)
        #if self.pool is not None:
        #    self.pool.add_task(self._clear)

    def _load(self, kwargs):
        while len(self._q_done) >= (
                self.max_upload_per_frame * self._num_workers):
            time.sleep(0.1)

        self._wait_for_resume()

        filename = kwargs['filename']
        load_callback = kwargs['load_callback']
        post_callback = kwargs['post_callback']
        try:
            proto = filename.split(':', 1)[0]
        except:
            # if blank filename then return
            return
        if load_callback is not None:
            data = load_callback(filename)
        elif proto in ('http', 'https', 'ftp', 'smb'):
            data = self._load_urllib(filename, kwargs['kwargs'])
        else:
            data = self._load_local(filename, kwargs['kwargs'])

        if post_callback:
            data = post_callback(data)

        # mb: also pass request_time
        self._q_done.appendleft((filename, data, kwargs['request_time']))
        self._trigger_update()

    def _update(self, *largs):
        # want to start it ?
        if self._start_wanted:
            if not self._running:
                self.start()
            self._start_wanted = False

        # in pause mode, don't unqueue anything.
        if self._paused:
            self._trigger_update()
            return

        for x in range(self.max_upload_per_frame):
            try:
                filename, data, timestamp = self._q_done.pop()
            except IndexError:
                return

            # create the image
            image = data  # ProxyImage(data)

            if image is None: # mb: discard items
                # remove cache and client entries
                Cache.remove('kv.loader', filename)
                for key in ((name, client) for name, client in self._client if name == filename):
                    self._client.remove(key)
                continue

            if not image.nocache:
                Cache.append('kv.loader', filename, image)
                # mb: fix cache times
                Cache._objects['kv.loader'][filename]['lastaccess'] = timestamp
                Cache._objects['kv.loader'][filename]['timestamp'] = timestamp

            # update client
            for c_filename, client in self._client[:]:
                if filename != c_filename:
                    continue
                # got one client to update
                client.image = image
                client.loaded = True
                client.dispatch('on_load')
                self._client.remove((c_filename, client))

        self._trigger_update()

Loader = TagitImageLoader()

## EOF ##