Created
July 24, 2019 14:57
-
-
Save mathewmoon/df10ffcae21caac74b27c3c2575915bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
import sys | |
import re | |
import base64 | |
from jsonpath_ng import jsonpath, parse | |
from sly import Lexer, Parser | |
message = {'body': {'bar': 'baz', 'baz': [1, 2, 3, 4, 5, 6, 7, 8], 'foo': 'bar'}} | |
autQry = "bar IN body AND body.baz[0]=1" | |
class DynamoLex(Lexer): | |
tokens = [ | |
'EQ', | |
'NE', | |
'LT', | |
'LT_OR_EQ', | |
'GT', | |
'GT_OR_EQ', | |
'IN', | |
'BETWEEN', | |
'EXISTS', | |
'NOT_EXISTS', | |
'BEGINS_WITH', | |
'CONTAINS', | |
'L_PARENTH', | |
'R_PARENTH', | |
'NOT', | |
'AND', | |
'OR', | |
'STRING', | |
'NUMBER', | |
'SIZE' | |
] | |
message = {} | |
EQ = r'=' | |
NE = r'<>' | |
LT = r'<' | |
LT_OR_EQ = r'<=' | |
GT = r'>' | |
GT_OR_EQ = r'>=' | |
IN = r'IN\s*\(((.+),?)+\)$' | |
BETWEEN = r'^BETWEEN$' | |
EXISTS = r'^attribute_exists\(.+\)$' | |
NOT_EXISTS = r'^attribute_not_exists\(.+\)$' | |
BEGINS_WITH = r'^begins_with\(.+\)$' | |
CONTAINS = r'^contains\(.+\)$' | |
L_PARENTH = r'^\($' | |
R_PARENTH = r'^\)$' | |
NOT = r'^NOT$' | |
AND = r'^AND$' | |
OR = r'^OR$' | |
STRING = r'^(\'|")(.*)\1$' #Anything quoted is treated as a string | |
PATH = r'^([a-zA-Z_][a-zA-Z_0-9]+\.?)+' # JSON paths are unquoted alpha numeric strings that can contain dots or underscores. | |
NUMBER = r'\d+' | |
SIZE = r'^size\(.+\)$' | |
ignore = ' \t' | |
def __init__(self, message, query): | |
self.message = message | |
@_(r'\d+') | |
def NUMBER(self, t): | |
try: | |
t.value = int(t.value) | |
except ValueError: | |
print("Invalid type for " % t.value) | |
print(e) | |
t.value = 0 | |
return t | |
@_(r'^([a-zA-Z_][a-zA-Z_0-9])+\.?') | |
def PATH(self, t): | |
# We want PATH to actually be the value of the JSON path | |
return self.getPathValue(str(t)) | |
def error(self, t): | |
print("Illegal character '%s'" % t.value[0]) | |
print(t) | |
raise SystemExit | |
def getPathValue(self, path): | |
try: | |
return [ match.value for match in path.find(self.message) ][0] | |
except: | |
return False | |
class DynamoParse(Parser): | |
tokens = DynamoLex.tokens | |
message = DynamoLex.message | |
precedence = ( | |
('left', 'OR'), | |
('left', 'AND'), | |
('right', 'NOT'), | |
('left', 'L_PARENTH','R_PARENTH'), | |
('left', 'EXISTS', 'NOT_EXISTS', 'BEGINS_WITH', 'CONTAINS'), | |
('left', 'BETWEEN'), | |
('left', 'IN'), | |
('left', 'EQ', 'NE', 'LT', 'LT_OR_EQ', 'GT', 'GT_OR_EQ') | |
) | |
def __init__(self): | |
self.message = keys | |
def isBase64(self, value): | |
try: | |
return base64.b64encode(base64.b64decode(value)) == value | |
except Exception: | |
return False | |
def getPathValue(self, path): | |
try: | |
return [ match.value for match in path.find(self.message) ][0] | |
except: | |
return False | |
@_('exp EQ exp') | |
def expr(self, p): | |
#First token is the path | |
return p[1]) == p[3] | |
@_('exp NE exp') | |
def expr(self, p): | |
return p[1] != p[3] | |
@_('exp LT exp') | |
def expr(self, p): | |
return p[1] < p[3] | |
@_('exp LT_OR_EQ exp') | |
def expr(self, p): | |
return p[1] <= p[3] | |
@_('exp GT exp') | |
def expr(self, p): | |
return p[1] > p[3] | |
@_('exp GT_OR_EQ exp') | |
def expr(self, p): | |
return p[1] >= p[3] | |
@_('exp IN exp') | |
def expr(self, p): | |
return p[1] in p[3] | |
@_('exp BETWEEN exp AND exp') | |
def expr(self, p): | |
return (p[1] <= p[3] and p[1] >= p[5]) or (p[1] <= p[5] and p[1] >= p[3]) | |
@_('EXISTS') | |
def expr(self, p): | |
# if self.getPathValue returns any non False value then the path must exist | |
return self.getPathValue(re.sub('^attribute_exists\(\'|")|(\'|")\)', '', p[1])): | |
@_('NOT_EXISTS') | |
def expr(self, p): | |
# same as above but return values are reversed | |
return not self.getPathValue(re.sub('^attribute_not_exists\(\'|")|(\'|")\)', '', p[1])): | |
@_('BEGINS_WITH') | |
def expr(self, p): | |
args = p = re.sub('(begins_with)|(\s*)|(\'|")|\(|\)', '', p[1]) | |
path = args.split(',')[0] | |
s = | |
return self.getPathValue(path, '', p[1])).startswith(): | |
@_('NOT expr') | |
def expr(self, p): | |
return not p.expr0 | |
@_('expr AND expr') | |
def expr(self, p): | |
return p.expr0 and p.expr1 | |
@_('expr OR expr') | |
def expr(self, p): | |
return p.expr0 or p.expr1 | |
@_('SIZE') | |
def expr(self.p): | |
value = self.getPathValue(re.sub('^size\(\'|\'\)', '', p[1])) | |
if value == False: | |
return 0 | |
if self.isBase64(value): | |
#Value is base64 | |
return (len(value) * 3) / 4 | |
else: | |
return len(str(value)) | |
@_('L_PARENTH expr R_PARENTH') | |
def exp(self, p): | |
return p[1] | |
@_('STRING', | |
'NUMBER', | |
'PATH') | |
def exp(self,p): | |
return p[1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment