aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bsie/base/errors.py3
-rw-r--r--bsie/utils/__init__.py2
-rw-r--r--bsie/utils/filematcher/__init__.py20
-rw-r--r--bsie/utils/filematcher/matcher.py177
-rw-r--r--bsie/utils/filematcher/parser.py148
-rw-r--r--setup.py2
-rw-r--r--test/utils/filematcher/__init__.py0
-rw-r--r--test/utils/filematcher/empty0
-rw-r--r--test/utils/filematcher/test_ast.py232
-rw-r--r--test/utils/filematcher/test_parser.py146
-rw-r--r--test/utils/filematcher/testimage.jpgbin0 -> 518 bytes
-rw-r--r--test/utils/filematcher/textfile.t4
12 files changed, 733 insertions, 1 deletions
diff --git a/bsie/base/errors.py b/bsie/base/errors.py
index dc3c30e..5fafd5b 100644
--- a/bsie/base/errors.py
+++ b/bsie/base/errors.py
@@ -39,4 +39,7 @@ class ProgrammingError(_BSIEError):
class UnreachableError(ProgrammingError):
"""Bravo, you've reached a point in code that should logically not be reachable."""
+class ParserError(_BSIEError):
+ """Failed to parse due to invalid syntax or structures."""
+
## EOF ##
diff --git a/bsie/utils/__init__.py b/bsie/utils/__init__.py
index bd22236..3981dc7 100644
--- a/bsie/utils/__init__.py
+++ b/bsie/utils/__init__.py
@@ -11,9 +11,11 @@ import typing
from . import bsfs
from . import namespaces as ns
from . import node
+from . import filematcher
# exports
__all__: typing.Sequence[str] = (
+ 'filematcher',
'bsfs',
'node',
'ns',
diff --git a/bsie/utils/filematcher/__init__.py b/bsie/utils/filematcher/__init__.py
new file mode 100644
index 0000000..b1c1b45
--- /dev/null
+++ b/bsie/utils/filematcher/__init__.py
@@ -0,0 +1,20 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# inner-module imports
+from .matcher import Matcher
+from .parser import parse
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Matcher',
+ 'parse',
+ )
+
+## EOF ##
diff --git a/bsie/utils/filematcher/matcher.py b/bsie/utils/filematcher/matcher.py
new file mode 100644
index 0000000..164beeb
--- /dev/null
+++ b/bsie/utils/filematcher/matcher.py
@@ -0,0 +1,177 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2021
+"""
+# imports
+from collections.abc import Callable, Collection, Hashable
+import abc
+import os
+import typing
+import magic
+
+# exports
+__all__: typing.Sequence[str] = []
+
+
+## code ##
+
+# abstract nodes
+
+class Matcher(abc.ABC, Hashable, Callable, Collection): # type: ignore [misc] # Invalid base class Callable
+ """Matcher node base class."""
+
+ # child expressions or terminals
+ _childs: typing.Set[typing.Any]
+
+ def __init__(self, *childs: typing.Any):
+ if len(childs) == 1 and isinstance(childs[0], (list, tuple, set)):
+ self._childs = set(childs[0])
+ else:
+ self._childs = set(childs)
+
+ def __contains__(self, needle: typing.Any) -> bool:
+ return needle in self._childs
+
+ def __iter__(self) -> typing.Iterator[typing.Any]:
+ return iter(self._childs)
+
+ def __len__(self) -> int:
+ return len(self._childs)
+
+ def __repr__(self) -> str:
+ return f'{type(self).__name__}({self._childs})'
+
+ def __hash__(self) -> int:
+ return hash((type(self), tuple(set(self._childs))))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self._childs == other._childs
+
+ @abc.abstractmethod
+ def __call__(self, path: str) -> bool: # pylint: disable=arguments-differ
+ """Check if *path* satisfies the conditions set by the Matcher instance."""
+
+class NOT(Matcher):
+ """Invert a matcher result."""
+ def __init__(self, expr: Matcher):
+ super().__init__(expr)
+ def __call__(self, path: str) -> bool:
+ return not next(iter(self._childs))(path)
+
+# aggregate nodes
+
+class Aggregate(Matcher): # pylint: disable=too-few-public-methods # Yeah, it's an interface...
+ """Aggregation function base class (And, Or)."""
+
+class And(Aggregate):
+ """Accept only if all conditions are satisfied."""
+ def __call__(self, path: str) -> bool:
+ for itm in self:
+ if not itm(path):
+ return False
+ return True
+
+class Or(Aggregate):
+ """Accept only if at least one condition is satisfied."""
+ def __call__(self, path: str) -> bool:
+ for itm in self:
+ if itm(path):
+ return True
+ return False
+
+
+# criteria nodes
+
+class Criterion(Matcher):
+ """Criterion base class. Limits acceptance to certain values."""
+ def accepted(self) -> typing.Set[typing.Any]:
+ """Return a set of accepted values."""
+ return self._childs
+
+# criteria w/o value (valueless)
+
+class Any(Criterion):
+ """Accepts anything."""
+ def __call__(self, path: str) -> bool:
+ return True
+
+class Nothing(Criterion):
+ """Accepts nothing."""
+ def __call__(self, path: str) -> bool:
+ return False
+
+class Exists(Criterion):
+ """Filters by existence."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path)
+
+class IsFile(Criterion):
+ """Checks if the path is a regular file."""
+ def __call__(self, path: str) -> bool:
+ return os.path.isfile(path)
+
+class IsDir(Criterion):
+ """Checks if the path is a directory."""
+ def __call__(self, path: str) -> bool:
+ return os.path.isdir(path)
+
+class IsLink(Criterion):
+ """Checks if the path is a link."""
+ def __call__(self, path: str) -> bool:
+ return os.path.islink(path)
+
+class IsAbs(Criterion):
+ """Checks if the path is an absolute path."""
+ def __call__(self, path: str) -> bool:
+ return os.path.isabs(path)
+
+class IsRel(Criterion):
+ """Checks if the path is a relative path."""
+ def __call__(self, path: str) -> bool:
+ return not os.path.isabs(path)
+
+class IsMount(Criterion):
+ """Checks if the path is a mount point."""
+ def __call__(self, path: str) -> bool:
+ return os.path.ismount(path)
+
+class IsEmpty(Criterion):
+ """Checks if the path is an empty file."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.stat(path).st_size == 0
+
+class IsReadable(Criterion):
+ """Checks if the path is readable."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.access(path, os.R_OK)
+
+class IsWritable(Criterion):
+ """Checks if the path is writable."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.access(path, os.W_OK)
+
+class IsExecutable(Criterion):
+ """Checks if the path is executable."""
+ def __call__(self, path: str) -> bool:
+ return os.path.exists(path) and os.access(path, os.X_OK)
+
+# criteria w/ value
+
+class Extension(Criterion):
+ """Filters by file extension (without the dot)."""
+ def __call__(self, path: str) -> bool:
+ _, ext = os.path.splitext(path)
+ return ext[1:] in self.accepted()
+
+class Mime(Criterion):
+ """Filters by mime type."""
+ def __call__(self, path: str) -> bool:
+ try:
+ return magic.from_file(path, mime=True).lower() in self.accepted()
+ except FileNotFoundError:
+ return False
+
+## EOF ##
diff --git a/bsie/utils/filematcher/parser.py b/bsie/utils/filematcher/parser.py
new file mode 100644
index 0000000..0654742
--- /dev/null
+++ b/bsie/utils/filematcher/parser.py
@@ -0,0 +1,148 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2021
+"""
+# standard imports
+import typing
+
+# non-standard imports
+import pyparsing
+from pyparsing import printables, alphas8bit, punc8bit, QuotedString, Word, \
+ delimitedList, Or, CaselessKeyword, Group, oneOf, Optional
+
+# bsie imports
+from bsie.base import errors
+
+# inner-module imports
+from . import matcher
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'parse',
+ )
+
+
+## code ##
+
+class FileMatcherParser():
+ """
+ EXPR := RULES | RULES "|" RULES
+ RULESET := RULE | RULE, RULE
+ RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS
+ OP := != | =
+ VALUES := VALUE | VALUE, VALUE
+ VALUE := [word]
+ CRITERION := mime | extension | ...
+ """
+
+ # criteria matcher nodes w/ arguments
+ _CRITERIA: typing.Dict[str, typing.Type[matcher.Matcher]] = {
+ 'extension': matcher.Extension,
+ 'mime': matcher.Mime,
+ }
+
+ # criteria matcher nodes w/o arguments
+ _VALUELESS: typing.Dict[str, typing.Type[matcher.Matcher]] = {
+ 'any': matcher.Any,
+ 'nothing': matcher.Nothing,
+ 'exists': matcher.Exists,
+ 'isfile': matcher.IsFile,
+ 'isdir': matcher.IsDir,
+ 'islink': matcher.IsLink,
+ 'isabs': matcher.IsAbs,
+ 'isrel': matcher.IsRel,
+ 'ismount': matcher.IsMount,
+ 'emtpy': matcher.IsEmpty,
+ 'readable': matcher.IsReadable,
+ 'writable': matcher.IsWritable,
+ 'executable': matcher.IsExecutable,
+ }
+
+ # pyparsing parser instance.
+ _parser: pyparsing.ParseExpression
+
+ def __init__(self):
+ # build the parser
+ # VALUE := [word]
+ alphabet = (printables + alphas8bit + punc8bit).translate(str.maketrans('', '', ',{}|='))
+ value = QuotedString(quoteChar='"', escChar='\\') ^ Word(alphabet)
+ # CRITERION := mime | extension | ...
+ criterion = Or([CaselessKeyword(p) for p in self._CRITERIA]).setResultsName('criterion')
+ valueless = Or([CaselessKeyword(p) for p in self._VALUELESS]).setResultsName('criterion')
+ # VALUES := VALUE | VALUE, VALUE
+ values = delimitedList(value, delim=',').setResultsName('value')
+ # OP := '=' | '!='
+ eqop = oneOf('= !=').setResultsName('op')
+ # RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS
+ rule_none = Group(Optional('!').setResultsName('op') + valueless).setResultsName('rule_none')
+ rule_one = Group(criterion + eqop + value.setResultsName('value')).setResultsName('rule_one')
+ rule_few = Group(criterion + eqop + '{' + values + '}').setResultsName('rule_few')
+ # RULESET := RULE | RULE, RULE
+ ruleset = Group(delimitedList(rule_none ^ rule_one ^ rule_few, delim=','))
+ # EXPR := RULESET | RULESET \| RULESET
+ self._parser = delimitedList(ruleset, delim='|')
+
+ def parse(self, query: str) -> matcher.Matcher: # pylint: disable=too-many-branches
+ """Build a file matcher from a rule definition."""
+ # preprocess the query
+ query = query.strip()
+
+ # empty query
+ if len(query) == 0:
+ return matcher.Any()
+
+ try:
+ parsed = self._parser.parseString(query, parseAll=True)
+ except pyparsing.ParseException as err:
+ raise errors.ParserError(f'Cannot parse query {err}')
+
+ # convert to Matcher
+ rules = []
+ for exp in parsed:
+ tokens = []
+ for rule in exp:
+ # fetch accepted values
+ if rule.getName() == 'rule_none':
+ accepted = []
+ elif rule.getName() == 'rule_one':
+ accepted = [rule.value]
+ elif rule.getName() == 'rule_few':
+ accepted = list(rule.value)
+ else: # prevented by grammar
+ raise errors.UnreachableError('Invalid rule definition')
+
+ # build criterion
+ if rule.criterion in self._VALUELESS:
+ cls = self._VALUELESS[rule.criterion]
+ if rule.op == '!':
+ tokens.append(matcher.NOT(cls()))
+ else:
+ tokens.append(cls())
+ elif rule.criterion in self._CRITERIA:
+ cls = self._CRITERIA[rule.criterion]
+ if rule.op == '!=':
+ tokens.append(matcher.NOT(cls(accepted)))
+ else:
+ tokens.append(cls(accepted))
+ else: # prevented by grammar
+ raise errors.UnreachableError(f'Invalid condition "{rule.criterion}"')
+
+ # And-aggregate rules in one ruleset (if needed)
+ tokens = matcher.And(tokens) if len(tokens) > 1 else tokens[0]
+ rules.append(tokens)
+
+ # Or-aggregate rulesets
+ expr = matcher.Or(rules) if len(rules) > 1 else rules[0]
+
+ return expr
+
+# build default instance
+file_match_parser = FileMatcherParser()
+
+def parse(query: str) -> matcher.Matcher:
+ """Shortcut for FileMatcherParser()(query)."""
+ return file_match_parser.parse(query)
+
+## EOF ##
diff --git a/setup.py b/setup.py
index ee9e0fd..8e0efd4 100644
--- a/setup.py
+++ b/setup.py
@@ -14,7 +14,7 @@ setup(
url='https://www.igsor.net/projects/blackstar/bsie/',
download_url='https://pip.igsor.net',
packages=('bsie', ),
- install_requires=('rdflib', 'bsfs'),
+ install_requires=('rdflib', 'bsfs', 'python-magic'),
python_requires=">=3.7",
)
diff --git a/test/utils/filematcher/__init__.py b/test/utils/filematcher/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/utils/filematcher/__init__.py
diff --git a/test/utils/filematcher/empty b/test/utils/filematcher/empty
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/utils/filematcher/empty
diff --git a/test/utils/filematcher/test_ast.py b/test/utils/filematcher/test_ast.py
new file mode 100644
index 0000000..ff4b86d
--- /dev/null
+++ b/test/utils/filematcher/test_ast.py
@@ -0,0 +1,232 @@
+"""
+
+Part of the bsie test suite.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import os
+import stat
+import tempfile
+import unittest
+
+# objects to test
+from bsie.utils.filematcher import matcher
+
+
+## code ##
+
+class FakeMatcher(matcher.Matcher):
+ def __call__(self, *args, **kwargs):
+ pass
+
+class FakeCriterion(matcher.Criterion):
+ def __call__(self, *args, **kwargs):
+ pass
+
+class FakeAggregate(matcher.Aggregate):
+ def __call__(self, *args, **kwargs):
+ pass
+
+class TestMatcher(unittest.TestCase):
+ def setUp(self):
+ # paths
+ self.image = os.path.join(os.path.dirname(__file__), 'testimage.jpg')
+ self.text= os.path.join(os.path.dirname(__file__), 'textfile.t')
+ self.empty = os.path.join(os.path.dirname(__file__), 'empty')
+ self.missing = os.path.join(os.path.dirname(__file__), 'missing.jpg')
+
+ def test_matcher_skeleton(self):
+ # node: iteration and length
+ self.assertSetEqual(set(iter(FakeMatcher(1,2,3))), {1,2,3})
+ self.assertSetEqual(set(iter(FakeMatcher([1,2,3]))), {1,2,3})
+ self.assertEqual(len(FakeMatcher([1,2,3])), 3)
+ self.assertEqual(len(FakeMatcher(1,2,3)), 3)
+ self.assertEqual(len(FakeMatcher()), 0)
+ self.assertIn(1, FakeMatcher(1,2,3))
+ self.assertIn(3, FakeMatcher([1,2,3]))
+ self.assertNotIn(0, FakeMatcher(1,2,3))
+ self.assertNotIn(4, FakeMatcher([1,2,3]))
+ # node: comparison
+ self.assertEqual(FakeMatcher([1,2,3]), FakeMatcher([1,2,3]))
+ self.assertEqual(FakeMatcher(1,2,3), FakeMatcher(1,2,3))
+ self.assertEqual(FakeMatcher(1,2,3), FakeMatcher([1,2,3]))
+ self.assertEqual(FakeMatcher(1,2,3), FakeMatcher((1,2,3)))
+ self.assertNotEqual(FakeMatcher(1,2,3), FakeMatcher(1,2,4))
+ self.assertNotEqual(FakeMatcher(1,2,3), FakeMatcher(1,2,3,4))
+ self.assertNotEqual(FakeMatcher(1,2,3), FakeMatcher(1,2))
+ self.assertEqual(hash(FakeMatcher([1,2,3])), hash(FakeMatcher([1,2,3])))
+ self.assertEqual(hash(FakeMatcher(1,2,3)), hash(FakeMatcher(1,2,3)))
+ self.assertEqual(hash(FakeMatcher(1,2,3)), hash(FakeMatcher([1,2,3])))
+ self.assertEqual(hash(FakeMatcher(1,2,3)), hash(FakeMatcher((1,2,3))))
+ # node: representation
+ self.assertEqual(repr(FakeMatcher(1,2,3)), 'FakeMatcher({1, 2, 3})')
+
+ # criterion
+ self.assertEqual(repr(FakeCriterion(1,2,3)), 'FakeCriterion({1, 2, 3})')
+ self.assertEqual(hash(FakeCriterion(1,2,3)), hash(FakeCriterion(1,2,3)))
+ self.assertEqual(FakeCriterion(1,2,3), FakeCriterion([1,2,3]))
+ self.assertNotEqual(FakeCriterion(1,2,3), FakeCriterion(1,2))
+ self.assertNotEqual(FakeCriterion(1,2,3), FakeMatcher(1,2,3))
+ self.assertSetEqual(FakeCriterion(1,2,3).accepted(), {1,2,3})
+
+ # aggregate
+ self.assertEqual(repr(FakeAggregate(1,2,3)), 'FakeAggregate({1, 2, 3})')
+ self.assertNotEqual(FakeAggregate(1,2,3), FakeMatcher(1,2,3))
+
+ def test_any(self):
+ self.assertTrue(matcher.Any()(self.image))
+ self.assertTrue(matcher.Any()(self.text))
+ self.assertTrue(matcher.Any()(self.missing))
+ self.assertTrue(matcher.Any()(self.empty))
+
+ def test_nothing(self):
+ self.assertFalse(matcher.Nothing()(self.image))
+ self.assertFalse(matcher.Nothing()(self.text))
+ self.assertFalse(matcher.Nothing()(self.missing))
+ self.assertFalse(matcher.Nothing()(self.empty))
+
+ def test_exists(self):
+ self.assertTrue(matcher.Exists()(self.image))
+ self.assertTrue(matcher.Exists()(self.text))
+ self.assertTrue(matcher.Exists()(self.empty))
+ self.assertFalse(matcher.Exists()(self.missing))
+
+ def test_isfile(self):
+ self.assertTrue(matcher.IsFile()(self.image))
+ self.assertTrue(matcher.IsFile()(self.text))
+ self.assertFalse(matcher.IsFile()(self.missing))
+ self.assertFalse(matcher.IsFile()(os.path.dirname(self.image)))
+
+ def test_isdir(self):
+ self.assertTrue(matcher.IsDir()(os.path.dirname(self.image)))
+ self.assertFalse(matcher.IsDir()(self.image))
+ self.assertFalse(matcher.IsDir()(self.text))
+ self.assertFalse(matcher.IsDir()(self.missing))
+
+ def test_islink(self):
+ self.assertFalse(matcher.IsLink()(os.path.dirname(self.image)))
+ self.assertFalse(matcher.IsLink()(self.image))
+ self.assertFalse(matcher.IsLink()(self.text))
+ _, temp = tempfile.mkstemp(prefix='bsie-test-')
+ templink = temp + '-link'
+ os.symlink(temp, templink)
+ self.assertTrue(matcher.IsLink()(templink))
+ os.unlink(templink)
+ os.unlink(temp)
+
+ def test_isabs(self):
+ self.assertTrue(matcher.IsAbs()(os.path.abspath(self.image)))
+ self.assertTrue(matcher.IsAbs()(os.path.abspath(self.text)))
+ self.assertFalse(matcher.IsAbs()(os.path.relpath(self.text, os.path.dirname(self.text))))
+
+ def test_isrel(self):
+ self.assertFalse(matcher.IsRel()(os.path.abspath(self.image)))
+ self.assertFalse(matcher.IsRel()(os.path.abspath(self.text)))
+ self.assertTrue(matcher.IsRel()(os.path.relpath(self.text, os.path.dirname(self.text))))
+ self.assertTrue(matcher.IsRel()(os.path.basename(self.text)))
+
+ def test_ismount(self):
+ self.assertFalse(matcher.IsMount()(self.image))
+ self.assertFalse(matcher.IsMount()(self.text))
+ self.assertFalse(matcher.IsMount()(self.missing))
+ # there's no reasonable way to test a positive case
+
+ def test_isempty(self):
+ self.assertTrue(matcher.IsEmpty()(self.empty))
+ self.assertFalse(matcher.IsEmpty()(self.image))
+ self.assertFalse(matcher.IsEmpty()(self.text))
+ self.assertFalse(matcher.IsEmpty()(self.missing))
+
+ def test_isreadable(self):
+ self.assertTrue(matcher.IsReadable()(self.empty))
+ self.assertTrue(matcher.IsReadable()(self.image))
+ self.assertFalse(matcher.IsReadable()(self.missing))
+ _, temp = tempfile.mkstemp(prefix='bsie-test-')
+ os.chmod(temp, 0)
+ self.assertFalse(matcher.IsReadable()(temp))
+ os.unlink(temp)
+
+ def test_iswritable(self):
+ self.assertTrue(matcher.IsWritable()(self.empty))
+ self.assertTrue(matcher.IsWritable()(self.image))
+ self.assertFalse(matcher.IsWritable()(self.missing))
+ _, temp = tempfile.mkstemp(prefix='bsie-test-')
+ os.chmod(temp, 0)
+ self.assertFalse(matcher.IsWritable()(temp))
+ os.unlink(temp)
+
+ def test_isexecutable(self):
+ self.assertFalse(matcher.IsExecutable()(self.empty))
+ self.assertFalse(matcher.IsExecutable()(self.image))
+ self.assertFalse(matcher.IsExecutable()(self.missing))
+ _, temp = tempfile.mkstemp(prefix='bsie-test-')
+ os.chmod(temp, stat.S_IEXEC)
+ self.assertTrue(matcher.IsExecutable()(temp))
+ os.unlink(temp)
+
+ def test_extension(self):
+ self.assertTrue(matcher.Extension('jpg')(self.image))
+ self.assertTrue(matcher.Extension('jpg', 'png')(self.image))
+ self.assertTrue(matcher.Extension('jpg', 't')(self.text))
+ self.assertTrue(matcher.Extension('jpg', 'png', 't')(self.missing))
+ self.assertTrue(matcher.Extension('')(self.empty))
+
+ self.assertFalse(matcher.Extension()(self.image))
+ self.assertFalse(matcher.Extension('jpeg')(self.image))
+ self.assertFalse(matcher.Extension('.t')(self.text))
+ self.assertFalse(matcher.Extension('png', 't')(self.missing))
+ self.assertFalse(matcher.Extension('tiff')(self.empty))
+
+ def test_mime(self):
+ self.assertTrue(matcher.Mime('image/jpeg')(self.image))
+ self.assertTrue(matcher.Mime('image/tiff', 'image/jpeg')(self.image))
+ self.assertTrue(matcher.Mime('text/plain', 'image/jpeg')(self.text))
+ self.assertTrue(matcher.Mime('inode/x-empty')(self.empty))
+
+ self.assertFalse(matcher.Mime()(self.image))
+ self.assertFalse(matcher.Mime('image')(self.image))
+ self.assertFalse(matcher.Mime('image/tiff', 'image/png')(self.image))
+ self.assertFalse(matcher.Mime('')(self.text))
+ self.assertFalse(matcher.Mime('text')(self.text))
+ self.assertFalse(matcher.Mime('tiff')(self.empty))
+ self.assertFalse(matcher.Mime()(self.empty))
+ self.assertFalse(matcher.Mime('')(self.empty))
+ self.assertFalse(matcher.Mime()(self.missing))
+ self.assertFalse(matcher.Mime('')(self.missing))
+ self.assertFalse(matcher.Mime('inode/x-empty')(self.missing))
+
+ def test_not(self):
+ self.assertFalse(matcher.NOT(matcher.Mime('image/jpeg'))(self.image))
+ self.assertTrue(matcher.NOT(matcher.Mime('text/plain'))(self.image))
+
+ def test_and(self):
+ self.assertTrue(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))(self.image))
+ self.assertTrue(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 'tiff'))(self.image))
+ self.assertTrue(matcher.And(matcher.Mime('text/plain'), matcher.Extension('t', 'tiff'))(self.text))
+
+ self.assertFalse(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('tiff'))(self.image))
+ self.assertFalse(matcher.And(matcher.Mime('text/plain'), matcher.Extension('jpg'))(self.image))
+ self.assertFalse(matcher.And(matcher.Mime('inode/x-empty'), matcher.Extension('jpg'))(self.missing))
+ self.assertFalse(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 't'))(self.text))
+
+ def test_or(self):
+ self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'))(self.image))
+ self.assertFalse(matcher.Or(matcher.Mime('text/plain'))(self.image))
+
+ self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))(self.image))
+ self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('t'))(self.image))
+ self.assertTrue(matcher.Or(matcher.Mime('text/plain'), matcher.Extension('jpg', 'tiff'))(self.image))
+ self.assertTrue(matcher.Or(matcher.Mime('text/plain'), matcher.Extension('tiff'))(self.text))
+ self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))(self.missing))
+
+ self.assertFalse(matcher.Or(matcher.Mime('text/plain'), matcher.Extension('tiff'))(self.image))
+ self.assertFalse(matcher.Or(matcher.Mime('inode/x-empty'), matcher.Extension('jpg', 'tiff'))(self.text))
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##
diff --git a/test/utils/filematcher/test_parser.py b/test/utils/filematcher/test_parser.py
new file mode 100644
index 0000000..a81d2ed
--- /dev/null
+++ b/test/utils/filematcher/test_parser.py
@@ -0,0 +1,146 @@
+"""
+
+Part of the bsie test suite.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import unittest
+
+# inner-module imports
+from bsie.base import errors
+from bsie.utils.filematcher import matcher
+
+# objects to test
+from bsie.utils.filematcher import parse
+
+
+## code ##
+
+class TestFileMatcherParser(unittest.TestCase):
+ def test_empty(self):
+ # no criterion
+ self.assertEqual(parse(''), matcher.Any())
+
+ def test_ruleone(self):
+ # single criterion, single value
+ self.assertEqual(parse('mime=text'), matcher.Mime('text'))
+ self.assertEqual(parse('MIME=text'), matcher.Mime('text'))
+ self.assertEqual(parse('MiMe=text'), matcher.Mime('text'))
+ self.assertEqual(parse('MIME=TEXT'), matcher.Mime('TEXT'))
+ self.assertEqual(parse('mime={text}'), matcher.Mime('text'))
+ self.assertEqual(parse('mime=image/jpeg'), matcher.Mime('image/jpeg'))
+ self.assertEqual(parse('mime="image/jpeg"'), matcher.Mime('image/jpeg'))
+ self.assertEqual(parse('extension=pdf'), matcher.Extension('pdf'))
+ self.assertEqual(parse('extension={pdf}'), matcher.Extension('pdf'))
+ self.assertEqual(parse('extension="pdf"'), matcher.Extension('pdf'))
+ self.assertEqual(parse('extension="foo,bar"'), matcher.Extension('foo,bar'))
+ self.assertEqual(parse('extension="f{oo|ba}r"'), matcher.Extension('f{oo|ba}r'))
+ self.assertEqual(parse('extension=""'), matcher.Extension(''))
+ self.assertEqual(parse('extension="foo'), matcher.Extension('"foo'))
+ self.assertRaises(errors.ParserError, parse, 'extension=foo=bar')
+ self.assertRaises(errors.ParserError, parse, 'extension=')
+ self.assertRaises(errors.ParserError, parse, 'extension={}')
+ self.assertRaises(errors.ParserError, parse, 'extension={foo')
+
+ # valueless
+ self.assertEqual(parse('any'), matcher.Any())
+ self.assertEqual(parse('nothing'), matcher.Nothing())
+ self.assertEqual(parse('exists'), matcher.Exists())
+ self.assertEqual(parse('any, nothing'), matcher.And(matcher.Any(), matcher.Nothing()))
+ self.assertEqual(parse('any, nothing, exists'),
+ matcher.And(matcher.Any(), matcher.Nothing(), matcher.Exists()))
+ self.assertEqual(parse('any, extension=jpg'), matcher.And(matcher.Any(), matcher.Extension('jpg')))
+ self.assertRaises(errors.ParserError, parse, 'mime')
+ self.assertRaises(errors.ParserError, parse, 'extension')
+ self.assertRaises(errors.ParserError, parse, 'exists=True')
+ self.assertRaises(errors.ParserError, parse, 'exists=foo')
+ self.assertEqual(parse('!any'), matcher.NOT(matcher.Any()))
+ self.assertEqual(parse('!any, nothing'), matcher.And(matcher.NOT(matcher.Any()), matcher.Nothing()))
+ self.assertEqual(parse('!any, extension=jpg'),
+ matcher.And(matcher.NOT(matcher.Any()), matcher.Extension('jpg')))
+ self.assertRaises(errors.ParserError, parse, '!mime')
+ self.assertRaises(errors.ParserError, parse, '!extension')
+
+ def test_rulefew(self):
+ # single criterion, multiple values
+ self.assertEqual(parse('extension={jpg, jpeg}'), matcher.Extension('jpg', 'jpeg'))
+ self.assertEqual(parse('mime={image/jpeg, image/png}'),
+ matcher.Mime('image/jpeg', 'image/png'))
+ self.assertRaises(errors.ParserError, parse, 'mime=image/png, image/jpeg')
+ self.assertRaises(errors.ParserError, parse, 'extension=jpg, jpeg')
+
+ def test_rulesets_ruleone(self):
+ # mutliple criteria, single value
+ self.assertEqual(parse('mime=text, extension=t'),
+ matcher.And(matcher.Mime('text'), matcher.Extension('t')))
+ self.assertEqual(parse('mime=text/plain, extension=t'),
+ matcher.And(matcher.Mime('text/plain'), matcher.Extension('t')))
+ self.assertRaises(errors.ParserError, parse, 'mime=text/plain extension=t')
+ self.assertRaises(errors.ParserError, parse, 'mime={image/jpeg, extension=jpg'),
+
+ def test_rulesets_rulefew(self):
+ # multiple criteria, multiple values
+ self.assertEqual(parse('mime=image/jpeg, extension={jpg, jpeg}'),
+ matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 'jpeg')))
+ self.assertEqual(parse('mime={image/jpeg, image/tiff}, extension={jpg, jpeg}'),
+ matcher.And(matcher.Mime('image/jpeg', 'image/tiff'), matcher.Extension('jpg', 'jpeg')))
+ self.assertEqual(parse('mime={image/jpeg, image/tiff}, extension=jpg'),
+ matcher.And(matcher.Mime('image/jpeg', 'image/tiff'), matcher.Extension('jpg')))
+ self.assertRaises(errors.ParserError, parse, 'mime={image/jpeg, image/tiff, extension=jpg')
+ self.assertRaises(errors.ParserError, parse, 'mime=image/jpeg, image/tiff, extension=jpg')
+ self.assertRaises(errors.ParserError, parse, 'mime=image/jpeg, extension=jpg, ')
+
+ def test_not(self):
+ self.assertEqual(parse('extension!=jpg'), matcher.NOT(matcher.Extension('jpg')))
+ self.assertEqual(parse('extension!={jpg, jpeg}'),
+ matcher.NOT(matcher.Extension('jpg', 'jpeg')))
+ self.assertEqual(parse('extension!=jpg, mime=image/jpeg'),
+ matcher.And(matcher.NOT(matcher.Extension('jpg')), matcher.Mime('image/jpeg')))
+ self.assertEqual(parse('extension!=jpg, mime!=image/jpeg'),
+ matcher.And(matcher.NOT(matcher.Extension('jpg')), matcher.NOT(matcher.Mime('image/jpeg'))))
+ self.assertEqual(parse('extension!=jpg | mime=image/jpeg'),
+ matcher.Or(matcher.NOT(matcher.Extension('jpg')), matcher.Mime('image/jpeg')))
+ self.assertEqual(parse('extension!=jpg | mime!=image/jpeg'),
+ matcher.Or(matcher.NOT(matcher.Extension('jpg')), matcher.NOT(matcher.Mime('image/jpeg'))))
+
+ def test_expr(self):
+ # multiple rulesets
+ self.assertEqual(parse('mime=image/jpeg | extension=jpg'),
+ matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg')))
+ self.assertEqual(parse('mime=image/jpeg | extension={jpg, jpeg}'),
+ matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 'jpeg')))
+ self.assertEqual(parse('mime={image/jpeg, image/png} | extension={jpg, jpeg}'),
+ matcher.Or(matcher.Mime('image/jpeg', 'image/png'), matcher.Extension('jpg', 'jpeg')))
+ self.assertEqual(parse('mime=image/jpeg , extension=jpg | extension=jpg'),
+ matcher.Or(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg')), matcher.Extension('jpg')))
+ self.assertEqual(parse(
+ 'mime={jpeg, text}, extension={jpg,t} | extension={png,txt}, mime={png, tiff}'),
+ matcher.Or(
+ matcher.And(matcher.Mime('jpeg', 'text'), matcher.Extension('jpg', 't')),
+ matcher.And(matcher.Extension('png', 'txt'), matcher.Mime('png', 'tiff'))))
+ self.assertEqual(parse('mime=text | extension=jpg | extension=png | mime=png'),
+ matcher.Or(matcher.Mime('text'), matcher.Extension('jpg'), matcher.Extension('png'), matcher.Mime('png')))
+ self.assertRaises(errors.ParserError, parse, 'mime=text |')
+ self.assertRaises(errors.ParserError, parse, '| mime=text')
+ self.assertRaises(errors.ParserError, parse, 'extension=png | mime=text, ')
+
+ def test_invalid(self):
+ # Invalid parses
+ self.assertRaises(errors.ParserError, parse, "extension=") # Empty value
+ self.assertRaises(errors.ParserError, parse, "mime=foo,bar") # Escaping
+ self.assertRaises(errors.ParserError, parse, "mime='foo,bar") # Quoting
+ self.assertRaises(errors.ParserError, parse, "mime=\"foo,bar") # Quoting
+
+ # Invalid input
+ self.assertRaises(AttributeError, parse, None)
+ self.assertRaises(AttributeError, parse, 123)
+ self.assertRaises(AttributeError, parse, [123,321])
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##
diff --git a/test/utils/filematcher/testimage.jpg b/test/utils/filematcher/testimage.jpg
new file mode 100644
index 0000000..ea7af63
--- /dev/null
+++ b/test/utils/filematcher/testimage.jpg
Binary files differ
diff --git a/test/utils/filematcher/textfile.t b/test/utils/filematcher/textfile.t
new file mode 100644
index 0000000..c389011
--- /dev/null
+++ b/test/utils/filematcher/textfile.t
@@ -0,0 +1,4 @@
+Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
+Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
+Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
+Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.