aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/utils/filematcher/matcher.py
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
commita35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch)
treefb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/utils/filematcher/matcher.py
parent7582c280ad5324a2f0427999911c7e7abc14a6ab (diff)
parentaf81318ae9311fd0b0e16949cef3cfaf7996970b (diff)
downloadbsie-main.tar.gz
bsie-main.tar.bz2
bsie-main.zip
Merge branch 'develop'HEADv0.23.03releasemain
Diffstat (limited to 'bsie/utils/filematcher/matcher.py')
-rw-r--r--bsie/utils/filematcher/matcher.py174
1 files changed, 174 insertions, 0 deletions
diff --git a/bsie/utils/filematcher/matcher.py b/bsie/utils/filematcher/matcher.py
new file mode 100644
index 0000000..1fa308e
--- /dev/null
+++ b/bsie/utils/filematcher/matcher.py
@@ -0,0 +1,174 @@
+
+# standard imports
+from collections.abc import Callable, Collection, Hashable
+import abc
+import os
+import typing
+
+# external imports
+import magic
+
+# exports
+__all__: typing.Sequence[str] = []
+
+
+## code ##
+
+# abstract nodes
+
+class Matcher(abc.ABC, Hashable, Callable, Collection): # type: ignore [misc] # Invalid base class Callable
+ """Matcher node base class."""
+
+ # child expressions or terminals
+ _childs: typing.Set[typing.Any]
+
+ def __init__(self, *childs: typing.Any):
+ if len(childs) == 1 and isinstance(childs[0], (list, tuple, set)):
+ self._childs = set(childs[0])
+ else:
+ self._childs = set(childs)
+
+ def __contains__(self, needle: typing.Any) -> bool:
+ return needle in self._childs
+
+ def __iter__(self) -> typing.Iterator[typing.Any]:
+ return iter(self._childs)
+
+ def __len__(self) -> int:
+ return len(self._childs)
+
+ def __repr__(self) -> str:
+ return f'{type(self).__name__}({self._childs})'
+
+ def __hash__(self) -> int:
+ return hash((type(self), tuple(set(self._childs))))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self._childs == other._childs
+
+ @abc.abstractmethod
+ def __call__(self, path: str) -> bool: # pylint: disable=arguments-differ
+ """Check if *path* satisfies the conditions set by the Matcher instance."""
+
+class NOT(Matcher):
+ """Invert a matcher result."""
+ def __init__(self, expr: Matcher):
+ super().__init__(expr)
+ def __call__(self, path: str) -> bool:
+ return not next(iter(self._childs))(path)
+
+# aggregate nodes
+
+class Aggregate(Matcher): # pylint: disable=too-few-public-methods # Yeah, it's an interface...
+ """Aggregation function base class (And, Or)."""
+
+class And(Aggregate):
+ """Accept only if all conditions are satisfied."""
+ def __call__(self, path: str) -> bool:
+ for itm in self:
+ if not itm(path):
+ return False
+ return True
+
+class Or(Aggregate):
+ """Accept only if at least one condition is satisfied."""
+ def __call__(self, path: str) -> bool:
+ for itm in self:
+ if itm(path):
+ return True
+ return False
+
+
+# criteria nodes
+
+class Criterion(Matcher):
+ """Criterion base class. Limits acceptance to certain values."""
+ def accepted(self) -> typing.Set[typing.Any]:
+ """Return a set of accepted values."""
+ return self._childs
+
+# criteria w/o value (valueless)
+
+class Any(Criterion):
+ """Accepts anything."""
+ def __call__(self, path: str) -> bool:
+ return True
+
+class Nothing(Criterion):
+ """Accepts nothing."""
+ def __call__(self, path: str) -> bool:
+ return False
+
+class Exists(Criterion):
+ """Filters by existence."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path)
+
+class IsFile(Criterion):
+ """Checks if the path is a regular file."""
+ def __call__(self, path: str) -> bool:
+ return os.path.isfile(path)
+
+class IsDir(Criterion):
+ """Checks if the path is a directory."""
+ def __call__(self, path: str) -> bool:
+ return os.path.isdir(path)
+
+class IsLink(Criterion):
+ """Checks if the path is a link."""
+ def __call__(self, path: str) -> bool:
+ return os.path.islink(path)
+
+class IsAbs(Criterion):
+ """Checks if the path is an absolute path."""
+ def __call__(self, path: str) -> bool:
+ return os.path.isabs(path)
+
+class IsRel(Criterion):
+ """Checks if the path is a relative path."""
+ def __call__(self, path: str) -> bool:
+ return not os.path.isabs(path)
+
+class IsMount(Criterion):
+ """Checks if the path is a mount point."""
+ def __call__(self, path: str) -> bool:
+ return os.path.ismount(path)
+
+class IsEmpty(Criterion):
+ """Checks if the path is an empty file."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.stat(path).st_size == 0
+
+class IsReadable(Criterion):
+ """Checks if the path is readable."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.access(path, os.R_OK)
+
+class IsWritable(Criterion):
+ """Checks if the path is writable."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.access(path, os.W_OK)
+
+class IsExecutable(Criterion):
+ """Checks if the path is executable."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.access(path, os.X_OK)
+
+# criteria w/ value
+
+class Extension(Criterion):
+ """Filters by file extension (without the dot)."""
+ def __call__(self, path: str) -> bool:
+ _, ext = os.path.splitext(path)
+ return ext[1:] in self.accepted()
+
+class Mime(Criterion):
+ """Filters by mime type."""
+ def __call__(self, path: str) -> bool:
+ try:
+ return magic.from_file(path, mime=True).lower() in self.accepted()
+ except FileNotFoundError:
+ return False
+
+## EOF ##