diff options
Diffstat (limited to 'tagit/utils')
-rw-r--r-- | tagit/utils/__init__.py | 23 | ||||
-rw-r--r-- | tagit/utils/bsfs.py | 26 | ||||
-rw-r--r-- | tagit/utils/builder.py | 82 | ||||
-rw-r--r-- | tagit/utils/errors.py | 60 | ||||
-rw-r--r-- | tagit/utils/frame.py | 84 | ||||
-rw-r--r-- | tagit/utils/namespaces.py | 41 | ||||
-rw-r--r-- | tagit/utils/rmatcher.py | 53 | ||||
-rw-r--r-- | tagit/utils/shared.py | 155 | ||||
-rw-r--r-- | tagit/utils/time.py | 63 |
9 files changed, 587 insertions, 0 deletions
diff --git a/tagit/utils/__init__.py b/tagit/utils/__init__.py new file mode 100644 index 0000000..daa9eab --- /dev/null +++ b/tagit/utils/__init__.py @@ -0,0 +1,23 @@ +""" + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# inner-module imports +from . import bsfs +from . import namespaces as ns +from . import rmatcher +from . import time as ttime +from .frame import Frame +from .shared import * # FIXME: port properly + +# exports +__all__: typing.Sequence[str] = ( + 'bsfs', + ) + +## EOF ## diff --git a/tagit/utils/bsfs.py b/tagit/utils/bsfs.py new file mode 100644 index 0000000..ab8baa5 --- /dev/null +++ b/tagit/utils/bsfs.py @@ -0,0 +1,26 @@ +"""BSFS bridge, provides BSFS bindings for tagit. + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# bsfs imports +from bsfs import schema, Open +from bsfs.namespace import Namespace +from bsfs.query import ast, matcher +from bsfs.utils import URI, uuid + +# exports +__all__: typing.Sequence[str] = ( + 'Namespace', + 'Open', + 'URI', + 'ast', + 'schema', + 'uuid' + ) + +## EOF ## diff --git a/tagit/utils/builder.py b/tagit/utils/builder.py new file mode 100644 index 0000000..f6c5818 --- /dev/null +++ b/tagit/utils/builder.py @@ -0,0 +1,82 @@ +""" + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +from collections import abc +from functools import partial +from inspect import isclass +import typing + +# exports +__all__: typing.Sequence[str] = ( + 'BuilderBase', + 'InvalidFactoryName', + ) + + +## code ## + +class InvalidFactoryName(KeyError): pass + +class BuilderBase(abc.Mapping, abc.Hashable): + _factories = dict() + + def __getitem__(self, key): + return self.get(key) + + def __contains__(self, key): + return key in self._factories + + def __iter__(self): + return iter(self._factories.keys()) + + def __hash__(self): + return hash(frozenset(self._factories.items())) + + def __len__(self): + return len(self._factories) + + def __eq__(self, other): + return type(self) == type(other) and self._factories == other._factories + + + def get(self, key): + if key not in self._factories: + raise InvalidFactoryName(key) + return self._factories[key] + + @classmethod + def keys(self): + return self._factories.keys() + + @classmethod + def items(self): + return self._factories.items() + + @classmethod + def describe(cls, key): + if key not in cls._factories: + raise InvalidFactoryName(key) + desc = cls._factories[key].__doc__ + return desc if desc is not None else '' + + def prepare(self, key, *args, **kwargs): + # If building is to be customized, overwrite this function. + return partial(self[key], *args, **kwargs) + + def build(self, key, *args, **kwargs): + fu = self.prepare(key, *args, **kwargs) + return fu() + + def key_from_instance(self, cls): + for key, clbk in self._factories.items(): + if isclass(clbk) and isinstance(cls, clbk): + return key + if not isclass(clbk) and cls == clbk: + return key + raise KeyError(type(cls)) + +## EOF ## diff --git a/tagit/utils/errors.py b/tagit/utils/errors.py new file mode 100644 index 0000000..8b5e21a --- /dev/null +++ b/tagit/utils/errors.py @@ -0,0 +1,60 @@ +"""Module-wide errors. + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# exports +__all__ = ( + 'EmptyFileError', + 'LoaderError', + 'NotAFileError', + 'ProgrammingError', + 'UserError', + 'abstract', + ) + + +## code ## + +def abstract(): + """Marks that a method has to be implemented in a child class.""" + raise NotImplementedError('abstract method that must be implemented in a subclass') + +class ProgrammingError(Exception): + """Reached a program state that shouldn't be reachable.""" + pass + +class UserError(ValueError): + """Found an illegal value that was specified directly by the user.""" + pass + +class NotAFileError(OSError): + """A file-system object is not a regular file.""" + pass + +class EmptyFileError(OSError): + """A file is unexpectedly empty.""" + pass + +class LoaderError(Exception): + """Failed to load or initialize a critical data structure.""" + pass + +class ParserFrontendError(Exception): + """Generic parser frontend error.""" + pass + +class ParserBackendError(Exception): + """Generic parser backend error.""" + pass + +class ParserError(Exception): + """String parsing failure.""" + pass + +class BackendError(Exception): + """Generic backend error.""" + pass + +## EOF ## diff --git a/tagit/utils/frame.py b/tagit/utils/frame.py new file mode 100644 index 0000000..c6bdc1e --- /dev/null +++ b/tagit/utils/frame.py @@ -0,0 +1,84 @@ +""" + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +import json + +# exports +__all__ = ('Frame', ) + + +## code ## + +class Frame(dict): + def __init__(self, cursor=None, selection=None, offset=0, **kwargs): + super(Frame, self).__init__(**kwargs) + selection = selection if selection is not None else [] + self['cursor'] = cursor + self['selection'] = selection + self['offset'] = offset + + @property + def selection(self): + return self['selection'] + + @property + def cursor(self): + return self['cursor'] + + @property + def offset(self): + return self['offset'] + + def copy(self): + return Frame(**super(Frame, self).copy()) + + def serialize(self): + return json.dumps({ + 'cursor': self.cursor.guid if self.cursor is not None else 'None', + 'group': self.cursor.group if hasattr(self.cursor, 'group') else 'None', + 'selection': [img.guid for img in self.selection], + 'offset': self.offset + }) + + @staticmethod + def from_serialized(lib, serialized, ignore_errors=True): + d = json.loads(serialized) + + # load cursor + cursor = None + try: + if 'cursor' in d and d['cursor'] is not None and d['cursor'].lower() != 'none': + cursor = lib.entity(d['cursor']) + except KeyError as err: + if not ignore_errors: + raise err + + if 'group' in d and d['group'] is not None and d['group'].lower() != 'none': + try: + # FIXME: late import; breaks module dependency structure + from tagit.storage.library.entity import Representative + cursor = Representative.Representative(lib, d['group']) + except ValueError: + # group doesn't exist anymore; ignore + pass + + # load selection + selection = [] + for guid in d.get('selection', []): + try: + selection.append(lib.entity(guid)) + except KeyError as err: + if not ignore_errors: + raise err + + return Frame( + cursor = cursor, + selection = selection, + offset = d.get('offset', 0) + ) + +## EOF ## diff --git a/tagit/utils/namespaces.py b/tagit/utils/namespaces.py new file mode 100644 index 0000000..a17a927 --- /dev/null +++ b/tagit/utils/namespaces.py @@ -0,0 +1,41 @@ +"""Default namespaces used throughout tagit. + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +import typing + +# inner-module imports +from . import bsfs as _bsfs + +# generic namespaces +xsd = _bsfs.Namespace('http://www.w3.org/2001/XMLSchema')() + +# core bsfs namespaces +bsfs = _bsfs.Namespace('https://schema.bsfs.io/core') +bsie = _bsfs.Namespace('https://schema.bsfs.io/ie') + +# auxiliary bsfs namespaces +bsn = bsie.Node +bse = bsn.Entity() +bsg = bsn.Group() +bsl = bsfs.Literal +bsp = bsn.Preview() +bst = bsn.Tag() + +# export +__all__: typing.Sequence[str] = ( + 'bse', + 'bsfs', + 'bsg', + 'bsie', + 'bsl', + 'bsn', + 'bsp', + 'bst', + 'xsd', + ) + +## EOF ## diff --git a/tagit/utils/rmatcher.py b/tagit/utils/rmatcher.py new file mode 100644 index 0000000..b5bb802 --- /dev/null +++ b/tagit/utils/rmatcher.py @@ -0,0 +1,53 @@ +""" + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# exports +__all__ = ( + 'by_area', + 'by_area_min', + ) + + +## code ## + +def by_area(target, candidates): + """Pick the item from *candidates* whose area is most similar to *target*.""" + target_area = target[0] * target[1] + scores = [ + (key, abs(target_area - res[0] * res[1])) + for key, res in candidates + ] + best_key, best_score = min(scores, key=lambda key_score: key_score[1]) + return best_key + + +def by_area_min(target, candidates): + """Pick the item from *candidates* whose area is at least that of *target*.""" + # rank the candidates by area difference + # a positive score means that the candidate is larger than the target. + target_area = target[0] * target[1] + scores = [(key, res[0] * res[1] - target_area) for key, res in candidates] + + # identify the two items with + # a) the smallest positive score (kmin), or + # b) the largest negative score (kmax) + kmin, kmax = None, None + cmin, cmax = float('inf'), float('-inf') + for key, score in scores: + if score >= 0 and score < cmin: + kmin, cmin = key, score + elif score < 0 and score > cmax: + kmax, cmax = key, score + + # prefer positive over negative scores + if cmin < float('inf'): + return kmin + if cmax > float('-inf'): + return kmax + # no viable resolution found + raise IndexError('list contains no valid element') + +## EOF ## diff --git a/tagit/utils/shared.py b/tagit/utils/shared.py new file mode 100644 index 0000000..b5ab421 --- /dev/null +++ b/tagit/utils/shared.py @@ -0,0 +1,155 @@ +# FIXME: port properly! +"""Shared functionality. + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +from collections import namedtuple +import logging +import os +import pkgutil +import platform +import re +import typing +import warnings + +# exports +__all__: typing.Sequence[str] = ( + 'Resolution', + 'Struct', + 'clamp', + 'fileopen', + 'flatten', + 'fst', + 'import_all', + 'is_hex', + 'is_list', + 'magnitude_fmt', + 'truncate_dir', + 'get_root', + ) + + +## code ## + +logger = logging.getLogger(__name__) + +fst = lambda lst: lst[0] + +def is_list(cand): + """Return true if *cand* is a list, a set, or a tuple""" + return isinstance(cand, list) or isinstance(cand, set) or isinstance(cand, tuple) + +def import_all(module, exclude=None, verbose=False): + """Recursively import all submodules of *module*. + *exclude* is a set of submodule names which will + be omitted. With *verbose*, all imports are logged + with level info. Returns all imported modules. + + >>> import tagit + >>> import_all(tagit, exclude={'tagit.shared.external'}) + + """ + exclude = set([] if exclude is None else exclude) + imports = [] + for importer, name, ispkg in pkgutil.iter_modules(module.__path__, module.__name__ + '.'): + if ispkg and all(re.match(exl, name) is None for exl in exclude): + if verbose: + logger.info(f'importing: {name}') + try: + module = __import__(name, fromlist='dummy') + imports.append(module) + imports += import_all(module, exclude, verbose) + except Exception as e: + logger.error(f'importing: {name}') + + return imports + +def clamp(value, hi, lo=0): + """Restrain a *value* to the range *lo* to *hi*.""" + return max(lo, min(hi, value)) + +Resolution = namedtuple('resolution', ('width', 'height')) + +def truncate_dir(path, cutoff=3): + """Remove path up to last *cutoff* directories""" + if cutoff < 0: raise ValueError('path cutoff must be positive') + dirs = os.path.dirname(path).split(os.path.sep) + last_dirs = dirs[max(0, len(dirs) - cutoff):] + prefix = '' + if os.path.isabs(path) and len(last_dirs) == len(dirs): + prefix = os.path.sep + + return prefix + os.path.join(*(last_dirs + [os.path.basename(path)])) + +def magnitude_fmt(num, suffix='iB', scale=1024): + """Human-readable number format. + + adapted from Sridhar Ratnakumar, 2009 + https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size + """ + for unit in ['','K','M','G','T','P','E','Z']: + if abs(num) < scale: + return "%3.1f%s%s" % (num, unit, suffix) + num /= scale + return "%.1f%s%s" % (num, 'Y', suffix) + +class Struct(dict): + """Dict with item access as members. + + >>> tup = Struct(timestamp=123, series=['1','2','3']) + >>> tup.timestamp + 123 + >>> tup['timestamp'] + 123 + + """ + def __getattr__(self, name): + return self[name] + def __setattr__(self, name, value): + self[name] = value + +def flatten(lst): + flat = [] + for itm in lst: + flat.extend(list(itm)) + return flat + +def is_hex(string): + """Return True if the *string* can be interpreted as a hex value.""" + try: + int(string, 16) + return True + except ValueError: + return False + except TypeError: + return False + +def fileopen(pth): + """Open a file in the preferred application as a subprocess. + This operation is platform dependent. + """ + try: + binopen = { + "Linux" : "xdg-open", # Linux + "darwin" : "open", # MAX OS X + "Windows" : "start", # Windows + }.get(platform.system()) + subprocess.call((binopen, pth)) + except KeyError: + warnings.warn('Unknown platform {}'.format(platform.system())) + + +def get_root(obj): + """Traverse the widget tree upwards until the root is found.""" + while obj.parent is not None and obj.parent != obj.parent.parent: + if hasattr(obj, 'root') and obj.root is not None: + return obj.root + + obj = obj.parent + + return obj + +## EOF ## diff --git a/tagit/utils/time.py b/tagit/utils/time.py new file mode 100644 index 0000000..4260ac7 --- /dev/null +++ b/tagit/utils/time.py @@ -0,0 +1,63 @@ +"""Time helpers. + +* Camera local +* System local +* UTC + +Timestamp to datetime + * Timestamp + * in UTC + * Timezone + * Implicit system local timezone + * No known timezone + * Known timezone + +Datetime to timestamp + * always store as local time + * optionally with UTC offset + +Part of the tagit module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# standard imports +from datetime import timezone, datetime, tzinfo, timedelta + +# exports +__all__ = ('timestamp', 'from_timestamp') + + +## code ## + +timestamp_loc = lambda dt: dt.replace(tzinfo=timezone.utc).timestamp() + +timestamp_utc = lambda dt: dt.timestamp() + +from_timestamp_loc = lambda ts: datetime.utcfromtimestamp(ts) + +from_timestamp_utc = lambda ts: datetime.fromtimestamp(ts) + +now = datetime.now + +timestamp_min = timestamp_loc(datetime.min) + +timestamp_max = timestamp_loc(datetime.max) + +def utcoffset(dt): + if dt.tzinfo is None: + return local_tzo(dt) + elif dt.tzinfo is NoTimeZone: + return None + else: + return dt.tzinfo.utcoffset(dt).total_seconds() / 3600 + +NoTimeZone = timezone(timedelta(0), 'NoTimeZone') + +def local_tzo(dt=None): + """Return the offset between the local time and UTC. + (i.e. return the x of UTC+x). + """ + dt = datetime.now() if dt is None else dt + return (timestamp_loc(dt) - dt.timestamp()) / 3600 + +## EOF ## |