diff options
Diffstat (limited to 'bsie/utils/filematcher/parser.py')
-rw-r--r-- | bsie/utils/filematcher/parser.py | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/bsie/utils/filematcher/parser.py b/bsie/utils/filematcher/parser.py new file mode 100644 index 0000000..dc28a0d --- /dev/null +++ b/bsie/utils/filematcher/parser.py @@ -0,0 +1,141 @@ + +# standard imports +import typing + +# external imports +import pyparsing +from pyparsing import printables, alphas8bit, punc8bit, QuotedString, Word, \ + delimitedList, Or, CaselessKeyword, Group, oneOf, Optional + +# inner-module imports +from . import matcher +from .. import errors + +# exports +__all__: typing.Sequence[str] = ( + 'parse', + ) + + +## code ## + +class FileMatcherParser(): + """ + EXPR := RULES | RULES "|" RULES + RULESET := RULE | RULE, RULE + RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS + OP := != | = + VALUES := VALUE | VALUE, VALUE + VALUE := [word] + CRITERION := mime | extension | ... + """ + + # criteria matcher nodes w/ arguments + _CRITERIA: typing.Dict[str, typing.Type[matcher.Matcher]] = { + 'extension': matcher.Extension, + 'mime': matcher.Mime, + } + + # criteria matcher nodes w/o arguments + _VALUELESS: typing.Dict[str, typing.Type[matcher.Matcher]] = { + 'any': matcher.Any, + 'nothing': matcher.Nothing, + 'exists': matcher.Exists, + 'isfile': matcher.IsFile, + 'isdir': matcher.IsDir, + 'islink': matcher.IsLink, + 'isabs': matcher.IsAbs, + 'isrel': matcher.IsRel, + 'ismount': matcher.IsMount, + 'emtpy': matcher.IsEmpty, + 'readable': matcher.IsReadable, + 'writable': matcher.IsWritable, + 'executable': matcher.IsExecutable, + } + + # pyparsing parser instance. + _parser: pyparsing.ParseExpression + + def __init__(self): + # build the parser + # VALUE := [word] + alphabet = (printables + alphas8bit + punc8bit).translate(str.maketrans('', '', ',{}|=')) + value = QuotedString(quoteChar='"', escChar='\\') ^ Word(alphabet) + # CRITERION := mime | extension | ... + criterion = Or([CaselessKeyword(p) for p in self._CRITERIA]).setResultsName('criterion') + valueless = Or([CaselessKeyword(p) for p in self._VALUELESS]).setResultsName('criterion') + # VALUES := VALUE | VALUE, VALUE + values = delimitedList(value, delim=',').setResultsName('value') + # OP := '=' | '!=' + eqop = oneOf('= !=').setResultsName('op') + # RULE := CRITERION OP VALUE | CRITERION OP {VALUES} | VALUELESS + rule_none = Group(Optional('!').setResultsName('op') + valueless).setResultsName('rule_none') + rule_one = Group(criterion + eqop + value.setResultsName('value')).setResultsName('rule_one') + rule_few = Group(criterion + eqop + '{' + values + '}').setResultsName('rule_few') + # RULESET := RULE | RULE, RULE + ruleset = Group(delimitedList(rule_none ^ rule_one ^ rule_few, delim=',')) + # EXPR := RULESET | RULESET \| RULESET + self._parser = delimitedList(ruleset, delim='|') + + def parse(self, query: str) -> matcher.Matcher: # pylint: disable=too-many-branches + """Build a file matcher from a rule definition.""" + # preprocess the query + query = query.strip() + + # empty query + if len(query) == 0: + return matcher.Any() + + try: + parsed = self._parser.parseString(query, parseAll=True) + except pyparsing.ParseException as err: + raise errors.ParserError(f'Cannot parse query {err}') + + # convert to Matcher + rules = [] + for exp in parsed: + tokens = [] + for rule in exp: + # fetch accepted values + if rule.getName() == 'rule_none': + accepted = [] + elif rule.getName() == 'rule_one': + accepted = [rule.value] + elif rule.getName() == 'rule_few': + accepted = list(rule.value) + else: # prevented by grammar + raise errors.UnreachableError('Invalid rule definition') + + # build criterion + if rule.criterion in self._VALUELESS: + cls = self._VALUELESS[rule.criterion] + if rule.op == '!': + tokens.append(matcher.NOT(cls())) + else: + tokens.append(cls()) + elif rule.criterion in self._CRITERIA: + cls = self._CRITERIA[rule.criterion] + if rule.op == '!=': + tokens.append(matcher.NOT(cls(accepted))) + else: + tokens.append(cls(accepted)) + else: # prevented by grammar + raise errors.UnreachableError(f'Invalid condition "{rule.criterion}"') + + # And-aggregate rules in one ruleset (if needed) + tokens = matcher.And(tokens) if len(tokens) > 1 else tokens[0] + rules.append(tokens) + + # Or-aggregate rulesets + expr = matcher.Or(rules) if len(rules) > 1 else rules[0] + + return expr + +# build default instance +file_match_parser = FileMatcherParser() + +def parse(query: str) -> matcher.Matcher: + """Shortcut for FileMatcherParser()(query).""" + return file_match_parser.parse(query) + +## EOF ## |