aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/utils/filematcher/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'bsie/utils/filematcher/parser.py')
-rw-r--r--bsie/utils/filematcher/parser.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/bsie/utils/filematcher/parser.py b/bsie/utils/filematcher/parser.py
new file mode 100644
index 0000000..dc28a0d
--- /dev/null
+++ b/bsie/utils/filematcher/parser.py
@@ -0,0 +1,141 @@
+
+# standard imports
+import typing
+
+# external imports
+import pyparsing
+from pyparsing import printables, alphas8bit, punc8bit, QuotedString, Word, \
+ delimitedList, Or, CaselessKeyword, Group, oneOf, Optional
+
+# inner-module imports
+from . import matcher
+from .. import errors
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'parse',
+ )
+
+
+## code ##
+
+class FileMatcherParser():
+ """
+ EXPR := RULES | RULES "|" RULES
+ RULESET := RULE | RULE, RULE
+ RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS
+ OP := != | =
+ VALUES := VALUE | VALUE, VALUE
+ VALUE := [word]
+ CRITERION := mime | extension | ...
+ """
+
+ # criteria matcher nodes w/ arguments
+ _CRITERIA: typing.Dict[str, typing.Type[matcher.Matcher]] = {
+ 'extension': matcher.Extension,
+ 'mime': matcher.Mime,
+ }
+
+ # criteria matcher nodes w/o arguments
+ _VALUELESS: typing.Dict[str, typing.Type[matcher.Matcher]] = {
+ 'any': matcher.Any,
+ 'nothing': matcher.Nothing,
+ 'exists': matcher.Exists,
+ 'isfile': matcher.IsFile,
+ 'isdir': matcher.IsDir,
+ 'islink': matcher.IsLink,
+ 'isabs': matcher.IsAbs,
+ 'isrel': matcher.IsRel,
+ 'ismount': matcher.IsMount,
+ 'emtpy': matcher.IsEmpty,
+ 'readable': matcher.IsReadable,
+ 'writable': matcher.IsWritable,
+ 'executable': matcher.IsExecutable,
+ }
+
+ # pyparsing parser instance.
+ _parser: pyparsing.ParseExpression
+
+ def __init__(self):
+ # build the parser
+ # VALUE := [word]
+ alphabet = (printables + alphas8bit + punc8bit).translate(str.maketrans('', '', ',{}|='))
+ value = QuotedString(quoteChar='"', escChar='\\') ^ Word(alphabet)
+ # CRITERION := mime | extension | ...
+ criterion = Or([CaselessKeyword(p) for p in self._CRITERIA]).setResultsName('criterion')
+ valueless = Or([CaselessKeyword(p) for p in self._VALUELESS]).setResultsName('criterion')
+ # VALUES := VALUE | VALUE, VALUE
+ values = delimitedList(value, delim=',').setResultsName('value')
+ # OP := '=' | '!='
+ eqop = oneOf('= !=').setResultsName('op')
+ # RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS
+ rule_none = Group(Optional('!').setResultsName('op') + valueless).setResultsName('rule_none')
+ rule_one = Group(criterion + eqop + value.setResultsName('value')).setResultsName('rule_one')
+ rule_few = Group(criterion + eqop + '{' + values + '}').setResultsName('rule_few')
+ # RULESET := RULE | RULE, RULE
+ ruleset = Group(delimitedList(rule_none ^ rule_one ^ rule_few, delim=','))
+ # EXPR := RULESET | RULESET \| RULESET
+ self._parser = delimitedList(ruleset, delim='|')
+
+ def parse(self, query: str) -> matcher.Matcher: # pylint: disable=too-many-branches
+ """Build a file matcher from a rule definition."""
+ # preprocess the query
+ query = query.strip()
+
+ # empty query
+ if len(query) == 0:
+ return matcher.Any()
+
+ try:
+ parsed = self._parser.parseString(query, parseAll=True)
+ except pyparsing.ParseException as err:
+ raise errors.ParserError(f'Cannot parse query {err}')
+
+ # convert to Matcher
+ rules = []
+ for exp in parsed:
+ tokens = []
+ for rule in exp:
+ # fetch accepted values
+ if rule.getName() == 'rule_none':
+ accepted = []
+ elif rule.getName() == 'rule_one':
+ accepted = [rule.value]
+ elif rule.getName() == 'rule_few':
+ accepted = list(rule.value)
+ else: # prevented by grammar
+ raise errors.UnreachableError('Invalid rule definition')
+
+ # build criterion
+ if rule.criterion in self._VALUELESS:
+ cls = self._VALUELESS[rule.criterion]
+ if rule.op == '!':
+ tokens.append(matcher.NOT(cls()))
+ else:
+ tokens.append(cls())
+ elif rule.criterion in self._CRITERIA:
+ cls = self._CRITERIA[rule.criterion]
+ if rule.op == '!=':
+ tokens.append(matcher.NOT(cls(accepted)))
+ else:
+ tokens.append(cls(accepted))
+ else: # prevented by grammar
+ raise errors.UnreachableError(f'Invalid condition "{rule.criterion}"')
+
+ # And-aggregate rules in one ruleset (if needed)
+ tokens = matcher.And(tokens) if len(tokens) > 1 else tokens[0]
+ rules.append(tokens)
+
+ # Or-aggregate rulesets
+ expr = matcher.Or(rules) if len(rules) > 1 else rules[0]
+
+ return expr
+
+# build default instance
+file_match_parser = FileMatcherParser()
+
+def parse(query: str) -> matcher.Matcher:
+ """Shortcut for FileMatcherParser()(query)."""
+ return file_match_parser.parse(query)
+
+## EOF ##