aboutsummaryrefslogtreecommitdiffstats
path: root/tagit/utils
diff options
context:
space:
mode:
Diffstat (limited to 'tagit/utils')
-rw-r--r--tagit/utils/__init__.py2
-rw-r--r--tagit/utils/builder.py82
-rw-r--r--tagit/utils/frame.py84
-rw-r--r--tagit/utils/shared.py77
-rw-r--r--tagit/utils/time.py63
5 files changed, 301 insertions, 7 deletions
diff --git a/tagit/utils/__init__.py b/tagit/utils/__init__.py
index d143034..3f09078 100644
--- a/tagit/utils/__init__.py
+++ b/tagit/utils/__init__.py
@@ -9,6 +9,8 @@ import typing
# inner-module imports
from . import bsfs
+from . import time as ttime
+from .frame import Frame
from .shared import * # FIXME: port properly
# exports
diff --git a/tagit/utils/builder.py b/tagit/utils/builder.py
new file mode 100644
index 0000000..f6c5818
--- /dev/null
+++ b/tagit/utils/builder.py
@@ -0,0 +1,82 @@
+"""
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+from collections import abc
+from functools import partial
+from inspect import isclass
+import typing
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'BuilderBase',
+ 'InvalidFactoryName',
+ )
+
+
+## code ##
+
+class InvalidFactoryName(KeyError): pass
+
+class BuilderBase(abc.Mapping, abc.Hashable):
+ _factories = dict()
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def __contains__(self, key):
+ return key in self._factories
+
+ def __iter__(self):
+ return iter(self._factories.keys())
+
+ def __hash__(self):
+ return hash(frozenset(self._factories.items()))
+
+ def __len__(self):
+ return len(self._factories)
+
+ def __eq__(self, other):
+ return type(self) == type(other) and self._factories == other._factories
+
+
+ def get(self, key):
+ if key not in self._factories:
+ raise InvalidFactoryName(key)
+ return self._factories[key]
+
+ @classmethod
+ def keys(self):
+ return self._factories.keys()
+
+ @classmethod
+ def items(self):
+ return self._factories.items()
+
+ @classmethod
+ def describe(cls, key):
+ if key not in cls._factories:
+ raise InvalidFactoryName(key)
+ desc = cls._factories[key].__doc__
+ return desc if desc is not None else ''
+
+ def prepare(self, key, *args, **kwargs):
+ # If building is to be customized, overwrite this function.
+ return partial(self[key], *args, **kwargs)
+
+ def build(self, key, *args, **kwargs):
+ fu = self.prepare(key, *args, **kwargs)
+ return fu()
+
+ def key_from_instance(self, cls):
+ for key, clbk in self._factories.items():
+ if isclass(clbk) and isinstance(cls, clbk):
+ return key
+ if not isclass(clbk) and cls == clbk:
+ return key
+ raise KeyError(type(cls))
+
+## EOF ##
diff --git a/tagit/utils/frame.py b/tagit/utils/frame.py
new file mode 100644
index 0000000..c6bdc1e
--- /dev/null
+++ b/tagit/utils/frame.py
@@ -0,0 +1,84 @@
+"""
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+import json
+
+# exports
+__all__ = ('Frame', )
+
+
+## code ##
+
+class Frame(dict):
+ def __init__(self, cursor=None, selection=None, offset=0, **kwargs):
+ super(Frame, self).__init__(**kwargs)
+ selection = selection if selection is not None else []
+ self['cursor'] = cursor
+ self['selection'] = selection
+ self['offset'] = offset
+
+ @property
+ def selection(self):
+ return self['selection']
+
+ @property
+ def cursor(self):
+ return self['cursor']
+
+ @property
+ def offset(self):
+ return self['offset']
+
+ def copy(self):
+ return Frame(**super(Frame, self).copy())
+
+ def serialize(self):
+ return json.dumps({
+ 'cursor': self.cursor.guid if self.cursor is not None else 'None',
+ 'group': self.cursor.group if hasattr(self.cursor, 'group') else 'None',
+ 'selection': [img.guid for img in self.selection],
+ 'offset': self.offset
+ })
+
+ @staticmethod
+ def from_serialized(lib, serialized, ignore_errors=True):
+ d = json.loads(serialized)
+
+ # load cursor
+ cursor = None
+ try:
+ if 'cursor' in d and d['cursor'] is not None and d['cursor'].lower() != 'none':
+ cursor = lib.entity(d['cursor'])
+ except KeyError as err:
+ if not ignore_errors:
+ raise err
+
+ if 'group' in d and d['group'] is not None and d['group'].lower() != 'none':
+ try:
+ # FIXME: late import; breaks module dependency structure
+ from tagit.storage.library.entity import Representative
+ cursor = Representative.Representative(lib, d['group'])
+ except ValueError:
+ # group doesn't exist anymore; ignore
+ pass
+
+ # load selection
+ selection = []
+ for guid in d.get('selection', []):
+ try:
+ selection.append(lib.entity(guid))
+ except KeyError as err:
+ if not ignore_errors:
+ raise err
+
+ return Frame(
+ cursor = cursor,
+ selection = selection,
+ offset = d.get('offset', 0)
+ )
+
+## EOF ##
diff --git a/tagit/utils/shared.py b/tagit/utils/shared.py
index 13ffd2a..82fe672 100644
--- a/tagit/utils/shared.py
+++ b/tagit/utils/shared.py
@@ -6,22 +6,25 @@ A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
+from collections import namedtuple
import logging
+import os
import pkgutil
import re
import typing
# exports
-__all__ = ('import_all', )
-
-
-## code ##
-
-# exports
__all__: typing.Sequence[str] = (
+ 'Resolution',
+ 'Struct',
+ 'clamp',
+ 'flatten',
'fst',
- 'is_list',
'import_all',
+ 'is_hex',
+ 'is_list',
+ 'magnitude_fmt',
+ 'truncate_dir',
)
@@ -60,4 +63,64 @@ def import_all(module, exclude=None, verbose=False):
return imports
+def clamp(value, hi, lo=0):
+ """Restrain a *value* to the range *lo* to *hi*."""
+ return max(lo, min(hi, value))
+
+Resolution = namedtuple('resolution', ('width', 'height'))
+
+def truncate_dir(path, cutoff=3):
+ """Remove path up to last *cutoff* directories"""
+ if cutoff < 0: raise ValueError('path cutoff must be positive')
+ dirs = os.path.dirname(path).split(os.path.sep)
+ last_dirs = dirs[max(0, len(dirs) - cutoff):]
+ prefix = ''
+ if os.path.isabs(path) and len(last_dirs) == len(dirs):
+ prefix = os.path.sep
+
+ return prefix + os.path.join(*(last_dirs + [os.path.basename(path)]))
+
+def magnitude_fmt(num, suffix='iB', scale=1024):
+ """Human-readable number format.
+
+ adapted from Sridhar Ratnakumar, 2009
+ https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
+ """
+ for unit in ['','K','M','G','T','P','E','Z']:
+ if abs(num) < scale:
+ return "%3.1f%s%s" % (num, unit, suffix)
+ num /= scale
+ return "%.1f%s%s" % (num, 'Y', suffix)
+
+class Struct(dict):
+ """Dict with item access as members.
+
+ >>> tup = Struct(timestamp=123, series=['1','2','3'])
+ >>> tup.timestamp
+ 123
+ >>> tup['timestamp']
+ 123
+
+ """
+ def __getattr__(self, name):
+ return self[name]
+ def __setattr__(self, name, value):
+ self[name] = value
+
+def flatten(lst):
+ flat = []
+ for itm in lst:
+ flat.extend(list(itm))
+ return flat
+
+def is_hex(string):
+ """Return True if the *string* can be interpreted as a hex value."""
+ try:
+ int(string, 16)
+ return True
+ except ValueError:
+ return False
+ except TypeError:
+ return False
+
## EOF ##
diff --git a/tagit/utils/time.py b/tagit/utils/time.py
new file mode 100644
index 0000000..4260ac7
--- /dev/null
+++ b/tagit/utils/time.py
@@ -0,0 +1,63 @@
+"""Time helpers.
+
+* Camera local
+* System local
+* UTC
+
+Timestamp to datetime
+ * Timestamp
+ * in UTC
+ * Timezone
+ * Implicit system local timezone
+ * No known timezone
+ * Known timezone
+
+Datetime to timestamp
+ * always store as local time
+ * optionally with UTC offset
+
+Part of the tagit module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# standard imports
+from datetime import timezone, datetime, tzinfo, timedelta
+
+# exports
+__all__ = ('timestamp', 'from_timestamp')
+
+
+## code ##
+
+timestamp_loc = lambda dt: dt.replace(tzinfo=timezone.utc).timestamp()
+
+timestamp_utc = lambda dt: dt.timestamp()
+
+from_timestamp_loc = lambda ts: datetime.utcfromtimestamp(ts)
+
+from_timestamp_utc = lambda ts: datetime.fromtimestamp(ts)
+
+now = datetime.now
+
+timestamp_min = timestamp_loc(datetime.min)
+
+timestamp_max = timestamp_loc(datetime.max)
+
+def utcoffset(dt):
+ if dt.tzinfo is None:
+ return local_tzo(dt)
+ elif dt.tzinfo is NoTimeZone:
+ return None
+ else:
+ return dt.tzinfo.utcoffset(dt).total_seconds() / 3600
+
+NoTimeZone = timezone(timedelta(0), 'NoTimeZone')
+
+def local_tzo(dt=None):
+ """Return the offset between the local time and UTC.
+ (i.e. return the x of UTC+x).
+ """
+ dt = datetime.now() if dt is None else dt
+ return (timestamp_loc(dt) - dt.timestamp()) / 3600
+
+## EOF ##