""" Part of the tagit module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # standard imports import time import typing # kivy imports from kivy.cache import Cache from kivy.clock import Clock from kivy.compat import queue from kivy.loader import _Worker, LoaderThreadPool, ProxyImage, LoaderBase # exports __all__: typing.Sequence[str] = ( 'Loader', ) ## code ## class _ThreadPool(object): """Pool of threads consuming tasks from a queue. Identical to kivy.loader._ThreadPool except for the queue type.""" def __init__(self, num_threads): super(_ThreadPool, self).__init__() self.running = True self.tasks = queue.LifoQueue() # mb: replace Queue with LifoQueue for _ in range(num_threads): _Worker(self, self.tasks) def add_task(self, func, *args, **kargs): self.tasks.put((func, args, kargs)) def stop(self): self.running = False self.tasks.join() class TagitImageLoader(LoaderThreadPool): """Threaded Loader that prioritises recentness. This is useful if a user skips through browser pages because then the preview loading finishes only after the user has already switched to the next page. Instead of waiting until all images up to the target page were loaded, prioritsation makes more recent images to load first. Mostly copied from kivy.loader.Loader. """ def start(self): LoaderBase.start(self) # mb: skip LoaderThreadPool.start self.pool = _ThreadPool(self._num_workers) Clock.schedule_interval(self.run, 0) def image(self, filename, load_callback=None, post_callback=None, **kwargs): data = Cache.get('kv.loader', filename) if data not in (None, False): # found image, if data is not here, need to reload. return ProxyImage(data, loading_image=self.loading_image, loaded=True, **kwargs) client = ProxyImage(self.loading_image, loading_image=self.loading_image, **kwargs) self._client.append((filename, client)) if data is None: # if data is None, this is really the first time self._q_load.appendleft({ 'filename': filename, 'load_callback': load_callback, 'post_callback': post_callback, 'request_time': Clock.get_time(), # mb: also pass time of original request 'kwargs': kwargs}) if not kwargs.get('nocache', False): Cache.append('kv.loader', filename, False) self._start_wanted = True self._trigger_update() else: # already queued for loading pass return client def _clear(self): if self.pool is not None: tbr = set() # clear loader queue while len(self._q_load): kargs = self._q_load.pop() tbr.add(kargs['filename']) # clear task queue while not self.pool.tasks.empty(): func, args, kargs = self.pool.tasks.get() if len(args) and 'filename' in args[0]: tbr.add(args[0]['filename']) self.pool.tasks.task_done() # remove spurious entries from cache for key in tbr: # remove directly from Cache if _clear is run from the main thread Cache.remove('kv.loader', key) # otherwise go via _q_done #self._q_done.appendleft(key, None, 0)) # remove spurious clients for key in ((name, client) for name, client in self._client if name in tbr): self._client.remove(key) def clear(self): """Empty the queue without loading the images.""" # execute in main thread self._clear() # schedule as event (no real benefit) #if self.pool is not None: # self.pool.add_task(self._clear) def _load(self, kwargs): while len(self._q_done) >= ( self.max_upload_per_frame * self._num_workers): time.sleep(0.1) self._wait_for_resume() filename = kwargs['filename'] load_callback = kwargs['load_callback'] post_callback = kwargs['post_callback'] try: proto = filename.split(':', 1)[0] except: # if blank filename then return return if load_callback is not None: data = load_callback(filename) elif proto in ('http', 'https', 'ftp', 'smb'): data = self._load_urllib(filename, kwargs['kwargs']) else: data = self._load_local(filename, kwargs['kwargs']) if post_callback: data = post_callback(data) # mb: also pass request_time self._q_done.appendleft((filename, data, kwargs['request_time'])) self._trigger_update() def _update(self, *largs): # want to start it ? if self._start_wanted: if not self._running: self.start() self._start_wanted = False # in pause mode, don't unqueue anything. if self._paused: self._trigger_update() return for x in range(self.max_upload_per_frame): try: filename, data, timestamp = self._q_done.pop() except IndexError: return # create the image image = data # ProxyImage(data) if image is None: # mb: discard items # remove cache and client entries Cache.remove('kv.loader', filename) for key in ((name, client) for name, client in self._client if name == filename): self._client.remove(key) continue if not image.nocache: Cache.append('kv.loader', filename, image) # mb: fix cache times Cache._objects['kv.loader'][filename]['lastaccess'] = timestamp Cache._objects['kv.loader'][filename]['timestamp'] = timestamp # update client for c_filename, client in self._client[:]: if filename != c_filename: continue # got one client to update client.image = image client.loaded = True client.dispatch('on_load') self._client.remove((c_filename, client)) self._trigger_update() Loader = TagitImageLoader() ## EOF ##