Created
July 24, 2019 16:11
-
-
Save mathewmoon/224dc3fe5d69dba655b9b0af5204e5b4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
import sys | |
import re | |
import base64 | |
from jsonpath_ng import jsonpath, parse | |
from sly import Lexer, Parser | |
message = {'body': {'bar': 'baz', 'baz': [1, 2, 3, 4, 5, 6, 7, 8], 'foo': 'bar'}} | |
autQry = "bar IN body AND body.baz[0]=1" | |
class DynamoLex(Lexer): | |
tokens = [ | |
'EQ', | |
'NE', | |
'LT', | |
'LT_OR_EQ', | |
'GT', | |
'GT_OR_EQ', | |
'IN', | |
'BETWEEN', | |
'EXISTS', | |
'NOT_EXISTS', | |
'BEGINS_WITH', | |
'CONTAINS', | |
'L_PARENTH', | |
'R_PARENTH', | |
'NOT', | |
'AND', | |
'OR', | |
'STRING', | |
'PATH', | |
'NUMBER', | |
'SIZE' | |
] | |
message = {} | |
EQ = r'=' | |
NE = r'<>' | |
LT = r'<' | |
LT_OR_EQ = r'<=' | |
GT = r'>' | |
GT_OR_EQ = r'>=' | |
IN = r'IN\s*\(((.+),?)+\)$' | |
BETWEEN = r'^BETWEEN$' | |
EXISTS = r'^attribute_exists\(.+\)$' | |
NOT_EXISTS = r'^attribute_not_exists\(.+\)$' | |
BEGINS_WITH = r'^begins_with\(.+\)$' | |
CONTAINS = r'^contains\(.+\)$' | |
L_PARENTH = r'^\($' | |
R_PARENTH = r'^\)$' | |
NOT = r'^NOT$' | |
AND = r'^AND$' | |
OR = r'^OR$' | |
STRING = r'(\'|")(.*)?(\'|")' #Anything quoted is treated as a string | |
PATH = r'^([a-zA-Z_][a-zA-Z_0-9]+\.?)+' # JSON paths are unquoted alpha numeric strings that can contain dots or underscores. | |
NUMBER = r'\d+' | |
SIZE = r'^size\(.+\)$' | |
ignore = ' \t' | |
def __init__(self, message, query): | |
self.message = message | |
@_(r'\d+') | |
def NUMBER(self, t): | |
try: | |
t.value = int(t.value) | |
except ValueError: | |
print("Invalid type for " % t.value) | |
print(e) | |
t.value = 0 | |
return t | |
@_(r'^([a-zA-Z_][a-zA-Z_0-9])+\.?') | |
def PATH(self, t): | |
# We want PATH to actually be the value of the JSON path | |
return self.getPathValue(str(t)) | |
def error(self, t): | |
print("Illegal character '%s'" % t.value[0]) | |
print(t) | |
raise SystemExit | |
def getPathValue(self, path): | |
try: | |
return [ match.value for match in path.find(self.message) ][0] | |
except: | |
return False | |
class DynamoParse(Parser): | |
tokens = DynamoLex.tokens | |
message = DynamoLex.message | |
precedence = ( | |
('left', 'OR'), | |
('left', 'AND'), | |
('right', 'NOT'), | |
('left', 'L_PARENTH','R_PARENTH'), | |
('left', 'EXISTS', 'NOT_EXISTS', 'BEGINS_WITH', 'CONTAINS'), | |
('left', 'BETWEEN'), | |
('left', 'IN'), | |
('left', 'EQ', 'NE', 'LT', 'LT_OR_EQ', 'GT', 'GT_OR_EQ') | |
) | |
def __init__(self): | |
self.message = keys | |
def isBase64(self, value): | |
try: | |
return base64.b64encode(base64.b64decode(value)) == value | |
except Exception: | |
return False | |
def getPathValue(self, path): | |
try: | |
return [ match.value for match in path.find(self.message) ][0] | |
except: | |
return False | |
@_('expr EQ expr') | |
def expr(self, p): | |
#First token is the path | |
return p[1] == p[3] | |
@_('expr NE exp') | |
def expr(self, p): | |
return p[1] != p[3] | |
@_('expr LT expr') | |
def expr(self, p): | |
return p[1] < p[3] | |
@_('expr LT_OR_EQ expr') | |
def expr(self, p): | |
return p[1] <= p[3] | |
@_('expr GT expr') | |
def expr(self, p): | |
return p[1] > p[3] | |
@_('expr GT_OR_EQ expr') | |
def expr(self, p): | |
return p[1] >= p[3] | |
@_('expr IN exp') | |
def expr(self, p): | |
return p[1] in p[3] | |
@_('expr BETWEEN expr AND expr') | |
def expr(self, p): | |
return (p[1] <= p[3] and p[1] >= p[5]) or (p[1] <= p[5] and p[1] >= p[3]) | |
@_('EXISTS') | |
def expr(self, p): | |
# if self.getPathValue returns any non False value then the path must exist | |
return self.getPathValue(re.sub('^attribute_exists\(\'|")|(\'|")\)', '', p[1])) | |
@_('NOT_EXISTS') | |
def expr(self, p): | |
# same as above but return values are reversed | |
return not self.getPathValue(re.sub('^attribute_not_exists\(\'|")|(\'|")\)', '', p[1])) | |
@_('BEGINS_WITH') | |
def expr(self, p): | |
args = p = re.sub('(begins_with)|(\s*)|(\'|")|\(|\)', '', p[1]).split(',') | |
path = args[0] | |
s = args[1] | |
return self.getPathValue(path, '', p[1]).startswith(s) | |
""" | |
@_('NOT expr') | |
def expr(self, p): | |
return not p[2] | |
""" | |
@_('expr AND expr') | |
def expr(self, p): | |
return p[1] and p[3] | |
@_('expr OR expr') | |
def expr(self, p): | |
return p[1] or p[3] | |
@_('SIZE') | |
def expr(self, p): | |
v = self.getPathValue(re.sub('^size\(\'|\'\)', '', p[1])) | |
if (type(v) is int) or (type(v) is float): | |
v = str(v) | |
if type(v) is bool: | |
return 0 | |
if self.isBase64(v): | |
return (len(v) * 3) / 4 | |
# Default. Matches: lists, dicts, strings, strings | |
return len(v) | |
@_('L_PARENTH expr R_PARENTH') | |
def exp(self, p): | |
return p[1] | |
@_('STRING', | |
'NUMBER', | |
'PATH') | |
def exp(self,p): | |
return p[1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment