aboutsummaryrefslogtreecommitdiffstats
path: root/tagit/utils
diff options
context:
space:
mode:
Diffstat (limited to 'tagit/utils')
-rw-r--r--tagit/utils/__init__.py23
-rw-r--r--tagit/utils/bsfs.py26
-rw-r--r--tagit/utils/builder.py82
-rw-r--r--tagit/utils/errors.py60
-rw-r--r--tagit/utils/frame.py84
-rw-r--r--tagit/utils/namespaces.py41
-rw-r--r--tagit/utils/rmatcher.py53
-rw-r--r--tagit/utils/shared.py155
-rw-r--r--tagit/utils/time.py63
9 files changed, 587 insertions, 0 deletions
diff --git a/tagit/utils/__init__.py b/tagit/utils/__init__.py
new file mode 100644
index 0000000..daa9eab
--- /dev/null
+++ b/tagit/utils/__init__.py
@@ -0,0 +1,23 @@
+"""
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# inner-module imports
+from . import bsfs
+from . import namespaces as ns
+from . import rmatcher
+from . import time as ttime
+from .frame import Frame
+from .shared import * # FIXME: port properly
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'bsfs',
+ )
+
+## EOF ##
diff --git a/tagit/utils/bsfs.py b/tagit/utils/bsfs.py
new file mode 100644
index 0000000..ab8baa5
--- /dev/null
+++ b/tagit/utils/bsfs.py
@@ -0,0 +1,26 @@
+"""BSFS bridge, provides BSFS bindings for tagit.
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# bsfs imports
+from bsfs import schema, Open
+from bsfs.namespace import Namespace
+from bsfs.query import ast, matcher
+from bsfs.utils import URI, uuid
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Namespace',
+ 'Open',
+ 'URI',
+ 'ast',
+ 'schema',
+ 'uuid'
+ )
+
+## EOF ##
diff --git a/tagit/utils/builder.py b/tagit/utils/builder.py
new file mode 100644
index 0000000..f6c5818
--- /dev/null
+++ b/tagit/utils/builder.py
@@ -0,0 +1,82 @@
+"""
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+from collections import abc
+from functools import partial
+from inspect import isclass
+import typing
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'BuilderBase',
+ 'InvalidFactoryName',
+ )
+
+
+## code ##
+
+class InvalidFactoryName(KeyError): pass
+
+class BuilderBase(abc.Mapping, abc.Hashable):
+ _factories = dict()
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def __contains__(self, key):
+ return key in self._factories
+
+ def __iter__(self):
+ return iter(self._factories.keys())
+
+ def __hash__(self):
+ return hash(frozenset(self._factories.items()))
+
+ def __len__(self):
+ return len(self._factories)
+
+ def __eq__(self, other):
+ return type(self) == type(other) and self._factories == other._factories
+
+
+ def get(self, key):
+ if key not in self._factories:
+ raise InvalidFactoryName(key)
+ return self._factories[key]
+
+ @classmethod
+ def keys(self):
+ return self._factories.keys()
+
+ @classmethod
+ def items(self):
+ return self._factories.items()
+
+ @classmethod
+ def describe(cls, key):
+ if key not in cls._factories:
+ raise InvalidFactoryName(key)
+ desc = cls._factories[key].__doc__
+ return desc if desc is not None else ''
+
+ def prepare(self, key, *args, **kwargs):
+ # If building is to be customized, overwrite this function.
+ return partial(self[key], *args, **kwargs)
+
+ def build(self, key, *args, **kwargs):
+ fu = self.prepare(key, *args, **kwargs)
+ return fu()
+
+ def key_from_instance(self, cls):
+ for key, clbk in self._factories.items():
+ if isclass(clbk) and isinstance(cls, clbk):
+ return key
+ if not isclass(clbk) and cls == clbk:
+ return key
+ raise KeyError(type(cls))
+
+## EOF ##
diff --git a/tagit/utils/errors.py b/tagit/utils/errors.py
new file mode 100644
index 0000000..8b5e21a
--- /dev/null
+++ b/tagit/utils/errors.py
@@ -0,0 +1,60 @@
+"""Module-wide errors.
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# exports
+__all__ = (
+ 'EmptyFileError',
+ 'LoaderError',
+ 'NotAFileError',
+ 'ProgrammingError',
+ 'UserError',
+ 'abstract',
+ )
+
+
+## code ##
+
+def abstract():
+ """Marks that a method has to be implemented in a child class."""
+ raise NotImplementedError('abstract method that must be implemented in a subclass')
+
+class ProgrammingError(Exception):
+ """Reached a program state that shouldn't be reachable."""
+ pass
+
+class UserError(ValueError):
+ """Found an illegal value that was specified directly by the user."""
+ pass
+
+class NotAFileError(OSError):
+ """A file-system object is not a regular file."""
+ pass
+
+class EmptyFileError(OSError):
+ """A file is unexpectedly empty."""
+ pass
+
+class LoaderError(Exception):
+ """Failed to load or initialize a critical data structure."""
+ pass
+
+class ParserFrontendError(Exception):
+ """Generic parser frontend error."""
+ pass
+
+class ParserBackendError(Exception):
+ """Generic parser backend error."""
+ pass
+
+class ParserError(Exception):
+ """String parsing failure."""
+ pass
+
+class BackendError(Exception):
+ """Generic backend error."""
+ pass
+
+## EOF ##
diff --git a/tagit/utils/frame.py b/tagit/utils/frame.py
new file mode 100644
index 0000000..c6bdc1e
--- /dev/null
+++ b/tagit/utils/frame.py
@@ -0,0 +1,84 @@
+"""
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+import json
+
+# exports
+__all__ = ('Frame', )
+
+
+## code ##
+
+class Frame(dict):
+ def __init__(self, cursor=None, selection=None, offset=0, **kwargs):
+ super(Frame, self).__init__(**kwargs)
+ selection = selection if selection is not None else []
+ self['cursor'] = cursor
+ self['selection'] = selection
+ self['offset'] = offset
+
+ @property
+ def selection(self):
+ return self['selection']
+
+ @property
+ def cursor(self):
+ return self['cursor']
+
+ @property
+ def offset(self):
+ return self['offset']
+
+ def copy(self):
+ return Frame(**super(Frame, self).copy())
+
+ def serialize(self):
+ return json.dumps({
+ 'cursor': self.cursor.guid if self.cursor is not None else 'None',
+ 'group': self.cursor.group if hasattr(self.cursor, 'group') else 'None',
+ 'selection': [img.guid for img in self.selection],
+ 'offset': self.offset
+ })
+
+ @staticmethod
+ def from_serialized(lib, serialized, ignore_errors=True):
+ d = json.loads(serialized)
+
+ # load cursor
+ cursor = None
+ try:
+ if 'cursor' in d and d['cursor'] is not None and d['cursor'].lower() != 'none':
+ cursor = lib.entity(d['cursor'])
+ except KeyError as err:
+ if not ignore_errors:
+ raise err
+
+ if 'group' in d and d['group'] is not None and d['group'].lower() != 'none':
+ try:
+ # FIXME: late import; breaks module dependency structure
+ from tagit.storage.library.entity import Representative
+ cursor = Representative.Representative(lib, d['group'])
+ except ValueError:
+ # group doesn't exist anymore; ignore
+ pass
+
+ # load selection
+ selection = []
+ for guid in d.get('selection', []):
+ try:
+ selection.append(lib.entity(guid))
+ except KeyError as err:
+ if not ignore_errors:
+ raise err
+
+ return Frame(
+ cursor = cursor,
+ selection = selection,
+ offset = d.get('offset', 0)
+ )
+
+## EOF ##
diff --git a/tagit/utils/namespaces.py b/tagit/utils/namespaces.py
new file mode 100644
index 0000000..a17a927
--- /dev/null
+++ b/tagit/utils/namespaces.py
@@ -0,0 +1,41 @@
+"""Default namespaces used throughout tagit.
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+import typing
+
+# inner-module imports
+from . import bsfs as _bsfs
+
+# generic namespaces
+xsd = _bsfs.Namespace('http://www.w3.org/2001/XMLSchema')()
+
+# core bsfs namespaces
+bsfs = _bsfs.Namespace('https://schema.bsfs.io/core')
+bsie = _bsfs.Namespace('https://schema.bsfs.io/ie')
+
+# auxiliary bsfs namespaces
+bsn = bsie.Node
+bse = bsn.Entity()
+bsg = bsn.Group()
+bsl = bsfs.Literal
+bsp = bsn.Preview()
+bst = bsn.Tag()
+
+# export
+__all__: typing.Sequence[str] = (
+ 'bse',
+ 'bsfs',
+ 'bsg',
+ 'bsie',
+ 'bsl',
+ 'bsn',
+ 'bsp',
+ 'bst',
+ 'xsd',
+ )
+
+## EOF ##
diff --git a/tagit/utils/rmatcher.py b/tagit/utils/rmatcher.py
new file mode 100644
index 0000000..b5bb802
--- /dev/null
+++ b/tagit/utils/rmatcher.py
@@ -0,0 +1,53 @@
+"""
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# exports
+__all__ = (
+ 'by_area',
+ 'by_area_min',
+ )
+
+
+## code ##
+
+def by_area(target, candidates):
+ """Pick the item from *candidates* whose area is most similar to *target*."""
+ target_area = target[0] * target[1]
+ scores = [
+ (key, abs(target_area - res[0] * res[1]))
+ for key, res in candidates
+ ]
+ best_key, best_score = min(scores, key=lambda key_score: key_score[1])
+ return best_key
+
+
+def by_area_min(target, candidates):
+ """Pick the item from *candidates* whose area is at least that of *target*."""
+ # rank the candidates by area difference
+ # a positive score means that the candidate is larger than the target.
+ target_area = target[0] * target[1]
+ scores = [(key, res[0] * res[1] - target_area) for key, res in candidates]
+
+ # identify the two items with
+ # a) the smallest positive score (kmin), or
+ # b) the largest negative score (kmax)
+ kmin, kmax = None, None
+ cmin, cmax = float('inf'), float('-inf')
+ for key, score in scores:
+ if score >= 0 and score < cmin:
+ kmin, cmin = key, score
+ elif score < 0 and score > cmax:
+ kmax, cmax = key, score
+
+ # prefer positive over negative scores
+ if cmin < float('inf'):
+ return kmin
+ if cmax > float('-inf'):
+ return kmax
+ # no viable resolution found
+ raise IndexError('list contains no valid element')
+
+## EOF ##
diff --git a/tagit/utils/shared.py b/tagit/utils/shared.py
new file mode 100644
index 0000000..b5ab421
--- /dev/null
+++ b/tagit/utils/shared.py
@@ -0,0 +1,155 @@
+# FIXME: port properly!
+"""Shared functionality.
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+from collections import namedtuple
+import logging
+import os
+import pkgutil
+import platform
+import re
+import typing
+import warnings
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Resolution',
+ 'Struct',
+ 'clamp',
+ 'fileopen',
+ 'flatten',
+ 'fst',
+ 'import_all',
+ 'is_hex',
+ 'is_list',
+ 'magnitude_fmt',
+ 'truncate_dir',
+ 'get_root',
+ )
+
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+fst = lambda lst: lst[0]
+
+def is_list(cand):
+ """Return true if *cand* is a list, a set, or a tuple"""
+ return isinstance(cand, list) or isinstance(cand, set) or isinstance(cand, tuple)
+
+def import_all(module, exclude=None, verbose=False):
+ """Recursively import all submodules of *module*.
+ *exclude* is a set of submodule names which will
+ be omitted. With *verbose*, all imports are logged
+ with level info. Returns all imported modules.
+
+ >>> import tagit
+ >>> import_all(tagit, exclude={'tagit.shared.external'})
+
+ """
+ exclude = set([] if exclude is None else exclude)
+ imports = []
+ for importer, name, ispkg in pkgutil.iter_modules(module.__path__, module.__name__ + '.'):
+ if ispkg and all(re.match(exl, name) is None for exl in exclude):
+ if verbose:
+ logger.info(f'importing: {name}')
+ try:
+ module = __import__(name, fromlist='dummy')
+ imports.append(module)
+ imports += import_all(module, exclude, verbose)
+ except Exception as e:
+ logger.error(f'importing: {name}')
+
+ return imports
+
+def clamp(value, hi, lo=0):
+ """Restrain a *value* to the range *lo* to *hi*."""
+ return max(lo, min(hi, value))
+
+Resolution = namedtuple('resolution', ('width', 'height'))
+
+def truncate_dir(path, cutoff=3):
+ """Remove path up to last *cutoff* directories"""
+ if cutoff < 0: raise ValueError('path cutoff must be positive')
+ dirs = os.path.dirname(path).split(os.path.sep)
+ last_dirs = dirs[max(0, len(dirs) - cutoff):]
+ prefix = ''
+ if os.path.isabs(path) and len(last_dirs) == len(dirs):
+ prefix = os.path.sep
+
+ return prefix + os.path.join(*(last_dirs + [os.path.basename(path)]))
+
+def magnitude_fmt(num, suffix='iB', scale=1024):
+ """Human-readable number format.
+
+ adapted from Sridhar Ratnakumar, 2009
+ https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
+ """
+ for unit in ['','K','M','G','T','P','E','Z']:
+ if abs(num) < scale:
+ return "%3.1f%s%s" % (num, unit, suffix)
+ num /= scale
+ return "%.1f%s%s" % (num, 'Y', suffix)
+
+class Struct(dict):
+ """Dict with item access as members.
+
+ >>> tup = Struct(timestamp=123, series=['1','2','3'])
+ >>> tup.timestamp
+ 123
+ >>> tup['timestamp']
+ 123
+
+ """
+ def __getattr__(self, name):
+ return self[name]
+ def __setattr__(self, name, value):
+ self[name] = value
+
+def flatten(lst):
+ flat = []
+ for itm in lst:
+ flat.extend(list(itm))
+ return flat
+
+def is_hex(string):
+ """Return True if the *string* can be interpreted as a hex value."""
+ try:
+ int(string, 16)
+ return True
+ except ValueError:
+ return False
+ except TypeError:
+ return False
+
+def fileopen(pth):
+ """Open a file in the preferred application as a subprocess.
+ This operation is platform dependent.
+ """
+ try:
+ binopen = {
+ "Linux" : "xdg-open", # Linux
+ "darwin" : "open", # MAX OS X
+ "Windows" : "start", # Windows
+ }.get(platform.system())
+ subprocess.call((binopen, pth))
+ except KeyError:
+ warnings.warn('Unknown platform {}'.format(platform.system()))
+
+
+def get_root(obj):
+ """Traverse the widget tree upwards until the root is found."""
+ while obj.parent is not None and obj.parent != obj.parent.parent:
+ if hasattr(obj, 'root') and obj.root is not None:
+ return obj.root
+
+ obj = obj.parent
+
+ return obj
+
+## EOF ##
diff --git a/tagit/utils/time.py b/tagit/utils/time.py
new file mode 100644
index 0000000..4260ac7
--- /dev/null
+++ b/tagit/utils/time.py
@@ -0,0 +1,63 @@
+"""Time helpers.
+
+* Camera local
+* System local
+* UTC
+
+Timestamp to datetime
+ * Timestamp
+ * in UTC
+ * Timezone
+ * Implicit system local timezone
+ * No known timezone
+ * Known timezone
+
+Datetime to timestamp
+ * always store as local time
+ * optionally with UTC offset
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+from datetime import timezone, datetime, tzinfo, timedelta
+
+# exports
+__all__ = ('timestamp', 'from_timestamp')
+
+
+## code ##
+
+timestamp_loc = lambda dt: dt.replace(tzinfo=timezone.utc).timestamp()
+
+timestamp_utc = lambda dt: dt.timestamp()
+
+from_timestamp_loc = lambda ts: datetime.utcfromtimestamp(ts)
+
+from_timestamp_utc = lambda ts: datetime.fromtimestamp(ts)
+
+now = datetime.now
+
+timestamp_min = timestamp_loc(datetime.min)
+
+timestamp_max = timestamp_loc(datetime.max)
+
+def utcoffset(dt):
+ if dt.tzinfo is None:
+ return local_tzo(dt)
+ elif dt.tzinfo is NoTimeZone:
+ return None
+ else:
+ return dt.tzinfo.utcoffset(dt).total_seconds() / 3600
+
+NoTimeZone = timezone(timedelta(0), 'NoTimeZone')
+
+def local_tzo(dt=None):
+ """Return the offset between the local time and UTC.
+ (i.e. return the x of UTC+x).
+ """
+ dt = datetime.now() if dt is None else dt
+ return (timestamp_loc(dt) - dt.timestamp()) / 3600
+
+## EOF ##