diff options
author | Matthias Baumgartner <dev@igsor.net> | 2022-12-23 16:25:51 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2022-12-23 16:25:51 +0100 |
commit | ed2074ae88f2db6cb6b38716b43b35e29eb2e16c (patch) | |
tree | f84a28414c4e22e3f3c25ca430e19ff42a1ec2d9 | |
parent | 057e09d6537bf5c39815661a75819081e3e5fda7 (diff) | |
download | bsie-ed2074ae88f2db6cb6b38716b43b35e29eb2e16c.tar.gz bsie-ed2074ae88f2db6cb6b38716b43b35e29eb2e16c.tar.bz2 bsie-ed2074ae88f2db6cb6b38716b43b35e29eb2e16c.zip |
filematcher: check file properties, formulate them as a string
-rw-r--r-- | bsie/base/errors.py | 3 | ||||
-rw-r--r-- | bsie/utils/__init__.py | 2 | ||||
-rw-r--r-- | bsie/utils/filematcher/__init__.py | 20 | ||||
-rw-r--r-- | bsie/utils/filematcher/matcher.py | 177 | ||||
-rw-r--r-- | bsie/utils/filematcher/parser.py | 148 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | test/utils/filematcher/__init__.py | 0 | ||||
-rw-r--r-- | test/utils/filematcher/empty | 0 | ||||
-rw-r--r-- | test/utils/filematcher/test_ast.py | 232 | ||||
-rw-r--r-- | test/utils/filematcher/test_parser.py | 146 | ||||
-rw-r--r-- | test/utils/filematcher/testimage.jpg | bin | 0 -> 518 bytes | |||
-rw-r--r-- | test/utils/filematcher/textfile.t | 4 |
12 files changed, 733 insertions, 1 deletions
diff --git a/bsie/base/errors.py b/bsie/base/errors.py index dc3c30e..5fafd5b 100644 --- a/bsie/base/errors.py +++ b/bsie/base/errors.py @@ -39,4 +39,7 @@ class ProgrammingError(_BSIEError): class UnreachableError(ProgrammingError): """Bravo, you've reached a point in code that should logically not be reachable.""" +class ParserError(_BSIEError): + """Failed to parse due to invalid syntax or structures.""" + ## EOF ## diff --git a/bsie/utils/__init__.py b/bsie/utils/__init__.py index bd22236..3981dc7 100644 --- a/bsie/utils/__init__.py +++ b/bsie/utils/__init__.py @@ -11,9 +11,11 @@ import typing from . import bsfs from . import namespaces as ns from . import node +from . import filematcher # exports __all__: typing.Sequence[str] = ( + 'filematcher', 'bsfs', 'node', 'ns', diff --git a/bsie/utils/filematcher/__init__.py b/bsie/utils/filematcher/__init__.py new file mode 100644 index 0000000..b1c1b45 --- /dev/null +++ b/bsie/utils/filematcher/__init__.py @@ -0,0 +1,20 @@ +""" + +Part of the bsie module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# inner-module imports +from .matcher import Matcher +from .parser import parse + +# exports +__all__: typing.Sequence[str] = ( + 'Matcher', + 'parse', + ) + +## EOF ## diff --git a/bsie/utils/filematcher/matcher.py b/bsie/utils/filematcher/matcher.py new file mode 100644 index 0000000..164beeb --- /dev/null +++ b/bsie/utils/filematcher/matcher.py @@ -0,0 +1,177 @@ +""" + +Part of the bsie module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2021 +""" +# imports +from collections.abc import Callable, Collection, Hashable +import abc +import os +import typing +import magic + +# exports +__all__: typing.Sequence[str] = [] + + +## code ## + +# abstract nodes + +class Matcher(abc.ABC, Hashable, Callable, Collection): # type: ignore [misc] # Invalid base class Callable + """Matcher node base class.""" + + # child expressions or terminals + _childs: typing.Set[typing.Any] + + def __init__(self, *childs: typing.Any): + if len(childs) == 1 and isinstance(childs[0], (list, tuple, set)): + self._childs = set(childs[0]) + else: + self._childs = set(childs) + + def __contains__(self, needle: typing.Any) -> bool: + return needle in self._childs + + def __iter__(self) -> typing.Iterator[typing.Any]: + return iter(self._childs) + + def __len__(self) -> int: + return len(self._childs) + + def __repr__(self) -> str: + return f'{type(self).__name__}({self._childs})' + + def __hash__(self) -> int: + return hash((type(self), tuple(set(self._childs)))) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self._childs == other._childs + + @abc.abstractmethod + def __call__(self, path: str) -> bool: # pylint: disable=arguments-differ + """Check if *path* satisfies the conditions set by the Matcher instance.""" + +class NOT(Matcher): + """Invert a matcher result.""" + def __init__(self, expr: Matcher): + super().__init__(expr) + def __call__(self, path: str) -> bool: + return not next(iter(self._childs))(path) + +# aggregate nodes + +class Aggregate(Matcher): # pylint: disable=too-few-public-methods # Yeah, it's an interface... + """Aggregation function base class (And, Or).""" + +class And(Aggregate): + """Accept only if all conditions are satisfied.""" + def __call__(self, path: str) -> bool: + for itm in self: + if not itm(path): + return False + return True + +class Or(Aggregate): + """Accept only if at least one condition is satisfied.""" + def __call__(self, path: str) -> bool: + for itm in self: + if itm(path): + return True + return False + + +# criteria nodes + +class Criterion(Matcher): + """Criterion base class. Limits acceptance to certain values.""" + def accepted(self) -> typing.Set[typing.Any]: + """Return a set of accepted values.""" + return self._childs + +# criteria w/o value (valueless) + +class Any(Criterion): + """Accepts anything.""" + def __call__(self, path: str) -> bool: + return True + +class Nothing(Criterion): + """Accepts nothing.""" + def __call__(self, path: str) -> bool: + return False + +class Exists(Criterion): + """Filters by existence.""" + def __call__(self, path: str) -> bool: + return os.path.exists(path) + +class IsFile(Criterion): + """Checks if the path is a regular file.""" + def __call__(self, path: str) -> bool: + return os.path.isfile(path) + +class IsDir(Criterion): + """Checks if the path is a directory.""" + def __call__(self, path: str) -> bool: + return os.path.isdir(path) + +class IsLink(Criterion): + """Checks if the path is a link.""" + def __call__(self, path: str) -> bool: + return os.path.islink(path) + +class IsAbs(Criterion): + """Checks if the path is an absolute path.""" + def __call__(self, path: str) -> bool: + return os.path.isabs(path) + +class IsRel(Criterion): + """Checks if the path is a relative path.""" + def __call__(self, path: str) -> bool: + return not os.path.isabs(path) + +class IsMount(Criterion): + """Checks if the path is a mount point.""" + def __call__(self, path: str) -> bool: + return os.path.ismount(path) + +class IsEmpty(Criterion): + """Checks if the path is an empty file.""" + def __call__(self, path: str) -> bool: + return os.path.exists(path) and os.stat(path).st_size == 0 + +class IsReadable(Criterion): + """Checks if the path is readable.""" + def __call__(self, path: str) -> bool: + return os.path.exists(path) and os.access(path, os.R_OK) + +class IsWritable(Criterion): + """Checks if the path is writable.""" + def __call__(self, path: str) -> bool: + return os.path.exists(path) and os.access(path, os.W_OK) + +class IsExecutable(Criterion): + """Checks if the path is executable.""" + def __call__(self, path: str) -> bool: + return os.path.exists(path) and os.access(path, os.X_OK) + +# criteria w/ value + +class Extension(Criterion): + """Filters by file extension (without the dot).""" + def __call__(self, path: str) -> bool: + _, ext = os.path.splitext(path) + return ext[1:] in self.accepted() + +class Mime(Criterion): + """Filters by mime type.""" + def __call__(self, path: str) -> bool: + try: + return magic.from_file(path, mime=True).lower() in self.accepted() + except FileNotFoundError: + return False + +## EOF ## diff --git a/bsie/utils/filematcher/parser.py b/bsie/utils/filematcher/parser.py new file mode 100644 index 0000000..0654742 --- /dev/null +++ b/bsie/utils/filematcher/parser.py @@ -0,0 +1,148 @@ +""" + +Part of the bsie module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2021 +""" +# standard imports +import typing + +# non-standard imports +import pyparsing +from pyparsing import printables, alphas8bit, punc8bit, QuotedString, Word, \ + delimitedList, Or, CaselessKeyword, Group, oneOf, Optional + +# bsie imports +from bsie.base import errors + +# inner-module imports +from . import matcher + +# exports +__all__: typing.Sequence[str] = ( + 'parse', + ) + + +## code ## + +class FileMatcherParser(): + """ + EXPR := RULES | RULES "|" RULES + RULESET := RULE | RULE, RULE + RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS + OP := != | = + VALUES := VALUE | VALUE, VALUE + VALUE := [word] + CRITERION := mime | extension | ... + """ + + # criteria matcher nodes w/ arguments + _CRITERIA: typing.Dict[str, typing.Type[matcher.Matcher]] = { + 'extension': matcher.Extension, + 'mime': matcher.Mime, + } + + # criteria matcher nodes w/o arguments + _VALUELESS: typing.Dict[str, typing.Type[matcher.Matcher]] = { + 'any': matcher.Any, + 'nothing': matcher.Nothing, + 'exists': matcher.Exists, + 'isfile': matcher.IsFile, + 'isdir': matcher.IsDir, + 'islink': matcher.IsLink, + 'isabs': matcher.IsAbs, + 'isrel': matcher.IsRel, + 'ismount': matcher.IsMount, + 'emtpy': matcher.IsEmpty, + 'readable': matcher.IsReadable, + 'writable': matcher.IsWritable, + 'executable': matcher.IsExecutable, + } + + # pyparsing parser instance. + _parser: pyparsing.ParseExpression + + def __init__(self): + # build the parser + # VALUE := [word] + alphabet = (printables + alphas8bit + punc8bit).translate(str.maketrans('', '', ',{}|=')) + value = QuotedString(quoteChar='"', escChar='\\') ^ Word(alphabet) + # CRITERION := mime | extension | ... + criterion = Or([CaselessKeyword(p) for p in self._CRITERIA]).setResultsName('criterion') + valueless = Or([CaselessKeyword(p) for p in self._VALUELESS]).setResultsName('criterion') + # VALUES := VALUE | VALUE, VALUE + values = delimitedList(value, delim=',').setResultsName('value') + # OP := '=' | '!=' + eqop = oneOf('= !=').setResultsName('op') + # RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS + rule_none = Group(Optional('!').setResultsName('op') + valueless).setResultsName('rule_none') + rule_one = Group(criterion + eqop + value.setResultsName('value')).setResultsName('rule_one') + rule_few = Group(criterion + eqop + '{' + values + '}').setResultsName('rule_few') + # RULESET := RULE | RULE, RULE + ruleset = Group(delimitedList(rule_none ^ rule_one ^ rule_few, delim=',')) + # EXPR := RULESET | RULESET \| RULESET + self._parser = delimitedList(ruleset, delim='|') + + def parse(self, query: str) -> matcher.Matcher: # pylint: disable=too-many-branches + """Build a file matcher from a rule definition.""" + # preprocess the query + query = query.strip() + + # empty query + if len(query) == 0: + return matcher.Any() + + try: + parsed = self._parser.parseString(query, parseAll=True) + except pyparsing.ParseException as err: + raise errors.ParserError(f'Cannot parse query {err}') + + # convert to Matcher + rules = [] + for exp in parsed: + tokens = [] + for rule in exp: + # fetch accepted values + if rule.getName() == 'rule_none': + accepted = [] + elif rule.getName() == 'rule_one': + accepted = [rule.value] + elif rule.getName() == 'rule_few': + accepted = list(rule.value) + else: # prevented by grammar + raise errors.UnreachableError('Invalid rule definition') + + # build criterion + if rule.criterion in self._VALUELESS: + cls = self._VALUELESS[rule.criterion] + if rule.op == '!': + tokens.append(matcher.NOT(cls())) + else: + tokens.append(cls()) + elif rule.criterion in self._CRITERIA: + cls = self._CRITERIA[rule.criterion] + if rule.op == '!=': + tokens.append(matcher.NOT(cls(accepted))) + else: + tokens.append(cls(accepted)) + else: # prevented by grammar + raise errors.UnreachableError(f'Invalid condition "{rule.criterion}"') + + # And-aggregate rules in one ruleset (if needed) + tokens = matcher.And(tokens) if len(tokens) > 1 else tokens[0] + rules.append(tokens) + + # Or-aggregate rulesets + expr = matcher.Or(rules) if len(rules) > 1 else rules[0] + + return expr + +# build default instance +file_match_parser = FileMatcherParser() + +def parse(query: str) -> matcher.Matcher: + """Shortcut for FileMatcherParser()(query).""" + return file_match_parser.parse(query) + +## EOF ## @@ -14,7 +14,7 @@ setup( url='https://www.igsor.net/projects/blackstar/bsie/', download_url='https://pip.igsor.net', packages=('bsie', ), - install_requires=('rdflib', 'bsfs'), + install_requires=('rdflib', 'bsfs', 'python-magic'), python_requires=">=3.7", ) diff --git a/test/utils/filematcher/__init__.py b/test/utils/filematcher/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/utils/filematcher/__init__.py diff --git a/test/utils/filematcher/empty b/test/utils/filematcher/empty new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/utils/filematcher/empty diff --git a/test/utils/filematcher/test_ast.py b/test/utils/filematcher/test_ast.py new file mode 100644 index 0000000..ff4b86d --- /dev/null +++ b/test/utils/filematcher/test_ast.py @@ -0,0 +1,232 @@ +""" + +Part of the bsie test suite. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import os +import stat +import tempfile +import unittest + +# objects to test +from bsie.utils.filematcher import matcher + + +## code ## + +class FakeMatcher(matcher.Matcher): + def __call__(self, *args, **kwargs): + pass + +class FakeCriterion(matcher.Criterion): + def __call__(self, *args, **kwargs): + pass + +class FakeAggregate(matcher.Aggregate): + def __call__(self, *args, **kwargs): + pass + +class TestMatcher(unittest.TestCase): + def setUp(self): + # paths + self.image = os.path.join(os.path.dirname(__file__), 'testimage.jpg') + self.text= os.path.join(os.path.dirname(__file__), 'textfile.t') + self.empty = os.path.join(os.path.dirname(__file__), 'empty') + self.missing = os.path.join(os.path.dirname(__file__), 'missing.jpg') + + def test_matcher_skeleton(self): + # node: iteration and length + self.assertSetEqual(set(iter(FakeMatcher(1,2,3))), {1,2,3}) + self.assertSetEqual(set(iter(FakeMatcher([1,2,3]))), {1,2,3}) + self.assertEqual(len(FakeMatcher([1,2,3])), 3) + self.assertEqual(len(FakeMatcher(1,2,3)), 3) + self.assertEqual(len(FakeMatcher()), 0) + self.assertIn(1, FakeMatcher(1,2,3)) + self.assertIn(3, FakeMatcher([1,2,3])) + self.assertNotIn(0, FakeMatcher(1,2,3)) + self.assertNotIn(4, FakeMatcher([1,2,3])) + # node: comparison + self.assertEqual(FakeMatcher([1,2,3]), FakeMatcher([1,2,3])) + self.assertEqual(FakeMatcher(1,2,3), FakeMatcher(1,2,3)) + self.assertEqual(FakeMatcher(1,2,3), FakeMatcher([1,2,3])) + self.assertEqual(FakeMatcher(1,2,3), FakeMatcher((1,2,3))) + self.assertNotEqual(FakeMatcher(1,2,3), FakeMatcher(1,2,4)) + self.assertNotEqual(FakeMatcher(1,2,3), FakeMatcher(1,2,3,4)) + self.assertNotEqual(FakeMatcher(1,2,3), FakeMatcher(1,2)) + self.assertEqual(hash(FakeMatcher([1,2,3])), hash(FakeMatcher([1,2,3]))) + self.assertEqual(hash(FakeMatcher(1,2,3)), hash(FakeMatcher(1,2,3))) + self.assertEqual(hash(FakeMatcher(1,2,3)), hash(FakeMatcher([1,2,3]))) + self.assertEqual(hash(FakeMatcher(1,2,3)), hash(FakeMatcher((1,2,3)))) + # node: representation + self.assertEqual(repr(FakeMatcher(1,2,3)), 'FakeMatcher({1, 2, 3})') + + # criterion + self.assertEqual(repr(FakeCriterion(1,2,3)), 'FakeCriterion({1, 2, 3})') + self.assertEqual(hash(FakeCriterion(1,2,3)), hash(FakeCriterion(1,2,3))) + self.assertEqual(FakeCriterion(1,2,3), FakeCriterion([1,2,3])) + self.assertNotEqual(FakeCriterion(1,2,3), FakeCriterion(1,2)) + self.assertNotEqual(FakeCriterion(1,2,3), FakeMatcher(1,2,3)) + self.assertSetEqual(FakeCriterion(1,2,3).accepted(), {1,2,3}) + + # aggregate + self.assertEqual(repr(FakeAggregate(1,2,3)), 'FakeAggregate({1, 2, 3})') + self.assertNotEqual(FakeAggregate(1,2,3), FakeMatcher(1,2,3)) + + def test_any(self): + self.assertTrue(matcher.Any()(self.image)) + self.assertTrue(matcher.Any()(self.text)) + self.assertTrue(matcher.Any()(self.missing)) + self.assertTrue(matcher.Any()(self.empty)) + + def test_nothing(self): + self.assertFalse(matcher.Nothing()(self.image)) + self.assertFalse(matcher.Nothing()(self.text)) + self.assertFalse(matcher.Nothing()(self.missing)) + self.assertFalse(matcher.Nothing()(self.empty)) + + def test_exists(self): + self.assertTrue(matcher.Exists()(self.image)) + self.assertTrue(matcher.Exists()(self.text)) + self.assertTrue(matcher.Exists()(self.empty)) + self.assertFalse(matcher.Exists()(self.missing)) + + def test_isfile(self): + self.assertTrue(matcher.IsFile()(self.image)) + self.assertTrue(matcher.IsFile()(self.text)) + self.assertFalse(matcher.IsFile()(self.missing)) + self.assertFalse(matcher.IsFile()(os.path.dirname(self.image))) + + def test_isdir(self): + self.assertTrue(matcher.IsDir()(os.path.dirname(self.image))) + self.assertFalse(matcher.IsDir()(self.image)) + self.assertFalse(matcher.IsDir()(self.text)) + self.assertFalse(matcher.IsDir()(self.missing)) + + def test_islink(self): + self.assertFalse(matcher.IsLink()(os.path.dirname(self.image))) + self.assertFalse(matcher.IsLink()(self.image)) + self.assertFalse(matcher.IsLink()(self.text)) + _, temp = tempfile.mkstemp(prefix='bsie-test-') + templink = temp + '-link' + os.symlink(temp, templink) + self.assertTrue(matcher.IsLink()(templink)) + os.unlink(templink) + os.unlink(temp) + + def test_isabs(self): + self.assertTrue(matcher.IsAbs()(os.path.abspath(self.image))) + self.assertTrue(matcher.IsAbs()(os.path.abspath(self.text))) + self.assertFalse(matcher.IsAbs()(os.path.relpath(self.text, os.path.dirname(self.text)))) + + def test_isrel(self): + self.assertFalse(matcher.IsRel()(os.path.abspath(self.image))) + self.assertFalse(matcher.IsRel()(os.path.abspath(self.text))) + self.assertTrue(matcher.IsRel()(os.path.relpath(self.text, os.path.dirname(self.text)))) + self.assertTrue(matcher.IsRel()(os.path.basename(self.text))) + + def test_ismount(self): + self.assertFalse(matcher.IsMount()(self.image)) + self.assertFalse(matcher.IsMount()(self.text)) + self.assertFalse(matcher.IsMount()(self.missing)) + # there's no reasonable way to test a positive case + + def test_isempty(self): + self.assertTrue(matcher.IsEmpty()(self.empty)) + self.assertFalse(matcher.IsEmpty()(self.image)) + self.assertFalse(matcher.IsEmpty()(self.text)) + self.assertFalse(matcher.IsEmpty()(self.missing)) + + def test_isreadable(self): + self.assertTrue(matcher.IsReadable()(self.empty)) + self.assertTrue(matcher.IsReadable()(self.image)) + self.assertFalse(matcher.IsReadable()(self.missing)) + _, temp = tempfile.mkstemp(prefix='bsie-test-') + os.chmod(temp, 0) + self.assertFalse(matcher.IsReadable()(temp)) + os.unlink(temp) + + def test_iswritable(self): + self.assertTrue(matcher.IsWritable()(self.empty)) + self.assertTrue(matcher.IsWritable()(self.image)) + self.assertFalse(matcher.IsWritable()(self.missing)) + _, temp = tempfile.mkstemp(prefix='bsie-test-') + os.chmod(temp, 0) + self.assertFalse(matcher.IsWritable()(temp)) + os.unlink(temp) + + def test_isexecutable(self): + self.assertFalse(matcher.IsExecutable()(self.empty)) + self.assertFalse(matcher.IsExecutable()(self.image)) + self.assertFalse(matcher.IsExecutable()(self.missing)) + _, temp = tempfile.mkstemp(prefix='bsie-test-') + os.chmod(temp, stat.S_IEXEC) + self.assertTrue(matcher.IsExecutable()(temp)) + os.unlink(temp) + + def test_extension(self): + self.assertTrue(matcher.Extension('jpg')(self.image)) + self.assertTrue(matcher.Extension('jpg', 'png')(self.image)) + self.assertTrue(matcher.Extension('jpg', 't')(self.text)) + self.assertTrue(matcher.Extension('jpg', 'png', 't')(self.missing)) + self.assertTrue(matcher.Extension('')(self.empty)) + + self.assertFalse(matcher.Extension()(self.image)) + self.assertFalse(matcher.Extension('jpeg')(self.image)) + self.assertFalse(matcher.Extension('.t')(self.text)) + self.assertFalse(matcher.Extension('png', 't')(self.missing)) + self.assertFalse(matcher.Extension('tiff')(self.empty)) + + def test_mime(self): + self.assertTrue(matcher.Mime('image/jpeg')(self.image)) + self.assertTrue(matcher.Mime('image/tiff', 'image/jpeg')(self.image)) + self.assertTrue(matcher.Mime('text/plain', 'image/jpeg')(self.text)) + self.assertTrue(matcher.Mime('inode/x-empty')(self.empty)) + + self.assertFalse(matcher.Mime()(self.image)) + self.assertFalse(matcher.Mime('image')(self.image)) + self.assertFalse(matcher.Mime('image/tiff', 'image/png')(self.image)) + self.assertFalse(matcher.Mime('')(self.text)) + self.assertFalse(matcher.Mime('text')(self.text)) + self.assertFalse(matcher.Mime('tiff')(self.empty)) + self.assertFalse(matcher.Mime()(self.empty)) + self.assertFalse(matcher.Mime('')(self.empty)) + self.assertFalse(matcher.Mime()(self.missing)) + self.assertFalse(matcher.Mime('')(self.missing)) + self.assertFalse(matcher.Mime('inode/x-empty')(self.missing)) + + def test_not(self): + self.assertFalse(matcher.NOT(matcher.Mime('image/jpeg'))(self.image)) + self.assertTrue(matcher.NOT(matcher.Mime('text/plain'))(self.image)) + + def test_and(self): + self.assertTrue(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))(self.image)) + self.assertTrue(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 'tiff'))(self.image)) + self.assertTrue(matcher.And(matcher.Mime('text/plain'), matcher.Extension('t', 'tiff'))(self.text)) + + self.assertFalse(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('tiff'))(self.image)) + self.assertFalse(matcher.And(matcher.Mime('text/plain'), matcher.Extension('jpg'))(self.image)) + self.assertFalse(matcher.And(matcher.Mime('inode/x-empty'), matcher.Extension('jpg'))(self.missing)) + self.assertFalse(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 't'))(self.text)) + + def test_or(self): + self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'))(self.image)) + self.assertFalse(matcher.Or(matcher.Mime('text/plain'))(self.image)) + + self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))(self.image)) + self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('t'))(self.image)) + self.assertTrue(matcher.Or(matcher.Mime('text/plain'), matcher.Extension('jpg', 'tiff'))(self.image)) + self.assertTrue(matcher.Or(matcher.Mime('text/plain'), matcher.Extension('tiff'))(self.text)) + self.assertTrue(matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))(self.missing)) + + self.assertFalse(matcher.Or(matcher.Mime('text/plain'), matcher.Extension('tiff'))(self.image)) + self.assertFalse(matcher.Or(matcher.Mime('inode/x-empty'), matcher.Extension('jpg', 'tiff'))(self.text)) + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## diff --git a/test/utils/filematcher/test_parser.py b/test/utils/filematcher/test_parser.py new file mode 100644 index 0000000..a81d2ed --- /dev/null +++ b/test/utils/filematcher/test_parser.py @@ -0,0 +1,146 @@ +""" + +Part of the bsie test suite. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import unittest + +# inner-module imports +from bsie.base import errors +from bsie.utils.filematcher import matcher + +# objects to test +from bsie.utils.filematcher import parse + + +## code ## + +class TestFileMatcherParser(unittest.TestCase): + def test_empty(self): + # no criterion + self.assertEqual(parse(''), matcher.Any()) + + def test_ruleone(self): + # single criterion, single value + self.assertEqual(parse('mime=text'), matcher.Mime('text')) + self.assertEqual(parse('MIME=text'), matcher.Mime('text')) + self.assertEqual(parse('MiMe=text'), matcher.Mime('text')) + self.assertEqual(parse('MIME=TEXT'), matcher.Mime('TEXT')) + self.assertEqual(parse('mime={text}'), matcher.Mime('text')) + self.assertEqual(parse('mime=image/jpeg'), matcher.Mime('image/jpeg')) + self.assertEqual(parse('mime="image/jpeg"'), matcher.Mime('image/jpeg')) + self.assertEqual(parse('extension=pdf'), matcher.Extension('pdf')) + self.assertEqual(parse('extension={pdf}'), matcher.Extension('pdf')) + self.assertEqual(parse('extension="pdf"'), matcher.Extension('pdf')) + self.assertEqual(parse('extension="foo,bar"'), matcher.Extension('foo,bar')) + self.assertEqual(parse('extension="f{oo|ba}r"'), matcher.Extension('f{oo|ba}r')) + self.assertEqual(parse('extension=""'), matcher.Extension('')) + self.assertEqual(parse('extension="foo'), matcher.Extension('"foo')) + self.assertRaises(errors.ParserError, parse, 'extension=foo=bar') + self.assertRaises(errors.ParserError, parse, 'extension=') + self.assertRaises(errors.ParserError, parse, 'extension={}') + self.assertRaises(errors.ParserError, parse, 'extension={foo') + + # valueless + self.assertEqual(parse('any'), matcher.Any()) + self.assertEqual(parse('nothing'), matcher.Nothing()) + self.assertEqual(parse('exists'), matcher.Exists()) + self.assertEqual(parse('any, nothing'), matcher.And(matcher.Any(), matcher.Nothing())) + self.assertEqual(parse('any, nothing, exists'), + matcher.And(matcher.Any(), matcher.Nothing(), matcher.Exists())) + self.assertEqual(parse('any, extension=jpg'), matcher.And(matcher.Any(), matcher.Extension('jpg'))) + self.assertRaises(errors.ParserError, parse, 'mime') + self.assertRaises(errors.ParserError, parse, 'extension') + self.assertRaises(errors.ParserError, parse, 'exists=True') + self.assertRaises(errors.ParserError, parse, 'exists=foo') + self.assertEqual(parse('!any'), matcher.NOT(matcher.Any())) + self.assertEqual(parse('!any, nothing'), matcher.And(matcher.NOT(matcher.Any()), matcher.Nothing())) + self.assertEqual(parse('!any, extension=jpg'), + matcher.And(matcher.NOT(matcher.Any()), matcher.Extension('jpg'))) + self.assertRaises(errors.ParserError, parse, '!mime') + self.assertRaises(errors.ParserError, parse, '!extension') + + def test_rulefew(self): + # single criterion, multiple values + self.assertEqual(parse('extension={jpg, jpeg}'), matcher.Extension('jpg', 'jpeg')) + self.assertEqual(parse('mime={image/jpeg, image/png}'), + matcher.Mime('image/jpeg', 'image/png')) + self.assertRaises(errors.ParserError, parse, 'mime=image/png, image/jpeg') + self.assertRaises(errors.ParserError, parse, 'extension=jpg, jpeg') + + def test_rulesets_ruleone(self): + # mutliple criteria, single value + self.assertEqual(parse('mime=text, extension=t'), + matcher.And(matcher.Mime('text'), matcher.Extension('t'))) + self.assertEqual(parse('mime=text/plain, extension=t'), + matcher.And(matcher.Mime('text/plain'), matcher.Extension('t'))) + self.assertRaises(errors.ParserError, parse, 'mime=text/plain extension=t') + self.assertRaises(errors.ParserError, parse, 'mime={image/jpeg, extension=jpg'), + + def test_rulesets_rulefew(self): + # multiple criteria, multiple values + self.assertEqual(parse('mime=image/jpeg, extension={jpg, jpeg}'), + matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 'jpeg'))) + self.assertEqual(parse('mime={image/jpeg, image/tiff}, extension={jpg, jpeg}'), + matcher.And(matcher.Mime('image/jpeg', 'image/tiff'), matcher.Extension('jpg', 'jpeg'))) + self.assertEqual(parse('mime={image/jpeg, image/tiff}, extension=jpg'), + matcher.And(matcher.Mime('image/jpeg', 'image/tiff'), matcher.Extension('jpg'))) + self.assertRaises(errors.ParserError, parse, 'mime={image/jpeg, image/tiff, extension=jpg') + self.assertRaises(errors.ParserError, parse, 'mime=image/jpeg, image/tiff, extension=jpg') + self.assertRaises(errors.ParserError, parse, 'mime=image/jpeg, extension=jpg, ') + + def test_not(self): + self.assertEqual(parse('extension!=jpg'), matcher.NOT(matcher.Extension('jpg'))) + self.assertEqual(parse('extension!={jpg, jpeg}'), + matcher.NOT(matcher.Extension('jpg', 'jpeg'))) + self.assertEqual(parse('extension!=jpg, mime=image/jpeg'), + matcher.And(matcher.NOT(matcher.Extension('jpg')), matcher.Mime('image/jpeg'))) + self.assertEqual(parse('extension!=jpg, mime!=image/jpeg'), + matcher.And(matcher.NOT(matcher.Extension('jpg')), matcher.NOT(matcher.Mime('image/jpeg')))) + self.assertEqual(parse('extension!=jpg | mime=image/jpeg'), + matcher.Or(matcher.NOT(matcher.Extension('jpg')), matcher.Mime('image/jpeg'))) + self.assertEqual(parse('extension!=jpg | mime!=image/jpeg'), + matcher.Or(matcher.NOT(matcher.Extension('jpg')), matcher.NOT(matcher.Mime('image/jpeg')))) + + def test_expr(self): + # multiple rulesets + self.assertEqual(parse('mime=image/jpeg | extension=jpg'), + matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg'))) + self.assertEqual(parse('mime=image/jpeg | extension={jpg, jpeg}'), + matcher.Or(matcher.Mime('image/jpeg'), matcher.Extension('jpg', 'jpeg'))) + self.assertEqual(parse('mime={image/jpeg, image/png} | extension={jpg, jpeg}'), + matcher.Or(matcher.Mime('image/jpeg', 'image/png'), matcher.Extension('jpg', 'jpeg'))) + self.assertEqual(parse('mime=image/jpeg , extension=jpg | extension=jpg'), + matcher.Or(matcher.And(matcher.Mime('image/jpeg'), matcher.Extension('jpg')), matcher.Extension('jpg'))) + self.assertEqual(parse( + 'mime={jpeg, text}, extension={jpg,t} | extension={png,txt}, mime={png, tiff}'), + matcher.Or( + matcher.And(matcher.Mime('jpeg', 'text'), matcher.Extension('jpg', 't')), + matcher.And(matcher.Extension('png', 'txt'), matcher.Mime('png', 'tiff')))) + self.assertEqual(parse('mime=text | extension=jpg | extension=png | mime=png'), + matcher.Or(matcher.Mime('text'), matcher.Extension('jpg'), matcher.Extension('png'), matcher.Mime('png'))) + self.assertRaises(errors.ParserError, parse, 'mime=text |') + self.assertRaises(errors.ParserError, parse, '| mime=text') + self.assertRaises(errors.ParserError, parse, 'extension=png | mime=text, ') + + def test_invalid(self): + # Invalid parses + self.assertRaises(errors.ParserError, parse, "extension=") # Empty value + self.assertRaises(errors.ParserError, parse, "mime=foo,bar") # Escaping + self.assertRaises(errors.ParserError, parse, "mime='foo,bar") # Quoting + self.assertRaises(errors.ParserError, parse, "mime=\"foo,bar") # Quoting + + # Invalid input + self.assertRaises(AttributeError, parse, None) + self.assertRaises(AttributeError, parse, 123) + self.assertRaises(AttributeError, parse, [123,321]) + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## diff --git a/test/utils/filematcher/testimage.jpg b/test/utils/filematcher/testimage.jpg Binary files differnew file mode 100644 index 0000000..ea7af63 --- /dev/null +++ b/test/utils/filematcher/testimage.jpg diff --git a/test/utils/filematcher/textfile.t b/test/utils/filematcher/textfile.t new file mode 100644 index 0000000..c389011 --- /dev/null +++ b/test/utils/filematcher/textfile.t @@ -0,0 +1,4 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. +Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. +Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. +Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. |