Skip to content

Instantly share code, notes, and snippets.

@mathewmoon
Created July 24, 2019 16:11
Show Gist options
  • Save mathewmoon/224dc3fe5d69dba655b9b0af5204e5b4 to your computer and use it in GitHub Desktop.
Save mathewmoon/224dc3fe5d69dba655b9b0af5204e5b4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.7
import sys
import re
import base64
from jsonpath_ng import jsonpath, parse
from sly import Lexer, Parser
message = {'body': {'bar': 'baz', 'baz': [1, 2, 3, 4, 5, 6, 7, 8], 'foo': 'bar'}}
autQry = "bar IN body AND body.baz[0]=1"
class DynamoLex(Lexer):
tokens = [
'EQ',
'NE',
'LT',
'LT_OR_EQ',
'GT',
'GT_OR_EQ',
'IN',
'BETWEEN',
'EXISTS',
'NOT_EXISTS',
'BEGINS_WITH',
'CONTAINS',
'L_PARENTH',
'R_PARENTH',
'NOT',
'AND',
'OR',
'STRING',
'PATH',
'NUMBER',
'SIZE'
]
message = {}
EQ = r'='
NE = r'<>'
LT = r'<'
LT_OR_EQ = r'<='
GT = r'>'
GT_OR_EQ = r'>='
IN = r'IN\s*\(((.+),?)+\)$'
BETWEEN = r'^BETWEEN$'
EXISTS = r'^attribute_exists\(.+\)$'
NOT_EXISTS = r'^attribute_not_exists\(.+\)$'
BEGINS_WITH = r'^begins_with\(.+\)$'
CONTAINS = r'^contains\(.+\)$'
L_PARENTH = r'^\($'
R_PARENTH = r'^\)$'
NOT = r'^NOT$'
AND = r'^AND$'
OR = r'^OR$'
STRING = r'(\'|")(.*)?(\'|")' #Anything quoted is treated as a string
PATH = r'^([a-zA-Z_][a-zA-Z_0-9]+\.?)+' # JSON paths are unquoted alpha numeric strings that can contain dots or underscores.
NUMBER = r'\d+'
SIZE = r'^size\(.+\)$'
ignore = ' \t'
def __init__(self, message, query):
self.message = message
@_(r'\d+')
def NUMBER(self, t):
try:
t.value = int(t.value)
except ValueError:
print("Invalid type for " % t.value)
print(e)
t.value = 0
return t
@_(r'^([a-zA-Z_][a-zA-Z_0-9])+\.?')
def PATH(self, t):
# We want PATH to actually be the value of the JSON path
return self.getPathValue(str(t))
def error(self, t):
print("Illegal character '%s'" % t.value[0])
print(t)
raise SystemExit
def getPathValue(self, path):
try:
return [ match.value for match in path.find(self.message) ][0]
except:
return False
class DynamoParse(Parser):
tokens = DynamoLex.tokens
message = DynamoLex.message
precedence = (
('left', 'OR'),
('left', 'AND'),
('right', 'NOT'),
('left', 'L_PARENTH','R_PARENTH'),
('left', 'EXISTS', 'NOT_EXISTS', 'BEGINS_WITH', 'CONTAINS'),
('left', 'BETWEEN'),
('left', 'IN'),
('left', 'EQ', 'NE', 'LT', 'LT_OR_EQ', 'GT', 'GT_OR_EQ')
)
def __init__(self):
self.message = keys
def isBase64(self, value):
try:
return base64.b64encode(base64.b64decode(value)) == value
except Exception:
return False
def getPathValue(self, path):
try:
return [ match.value for match in path.find(self.message) ][0]
except:
return False
@_('expr EQ expr')
def expr(self, p):
#First token is the path
return p[1] == p[3]
@_('expr NE exp')
def expr(self, p):
return p[1] != p[3]
@_('expr LT expr')
def expr(self, p):
return p[1] < p[3]
@_('expr LT_OR_EQ expr')
def expr(self, p):
return p[1] <= p[3]
@_('expr GT expr')
def expr(self, p):
return p[1] > p[3]
@_('expr GT_OR_EQ expr')
def expr(self, p):
return p[1] >= p[3]
@_('expr IN exp')
def expr(self, p):
return p[1] in p[3]
@_('expr BETWEEN expr AND expr')
def expr(self, p):
return (p[1] <= p[3] and p[1] >= p[5]) or (p[1] <= p[5] and p[1] >= p[3])
@_('EXISTS')
def expr(self, p):
# if self.getPathValue returns any non False value then the path must exist
return self.getPathValue(re.sub('^attribute_exists\(\'|")|(\'|")\)', '', p[1]))
@_('NOT_EXISTS')
def expr(self, p):
# same as above but return values are reversed
return not self.getPathValue(re.sub('^attribute_not_exists\(\'|")|(\'|")\)', '', p[1]))
@_('BEGINS_WITH')
def expr(self, p):
args = p = re.sub('(begins_with)|(\s*)|(\'|")|\(|\)', '', p[1]).split(',')
path = args[0]
s = args[1]
return self.getPathValue(path, '', p[1]).startswith(s)
"""
@_('NOT expr')
def expr(self, p):
return not p[2]
"""
@_('expr AND expr')
def expr(self, p):
return p[1] and p[3]
@_('expr OR expr')
def expr(self, p):
return p[1] or p[3]
@_('SIZE')
def expr(self, p):
v = self.getPathValue(re.sub('^size\(\'|\'\)', '', p[1]))
if (type(v) is int) or (type(v) is float):
v = str(v)
if type(v) is bool:
return 0
if self.isBase64(v):
return (len(v) * 3) / 4
# Default. Matches: lists, dicts, strings, strings
return len(v)
@_('L_PARENTH expr R_PARENTH')
def exp(self, p):
return p[1]
@_('STRING',
'NUMBER',
'PATH')
def exp(self,p):
return p[1]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment