Last active
February 2, 2020 14:37
-
-
Save crhan/b208009a85176f27d3fe027a2bc1b5b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sly import Lexer, Parser | |
import pytest | |
from enum import Enum | |
class QUOTE(Enum): | |
START = 0 | |
END = 1 | |
class IRedisLexer(Lexer): | |
tokens = {ID, QUOTE, ESCAPE, SPACE} | |
QUOTE = r"['\"]" | |
ESCAPE = r"\\" | |
SPACE = r"\s" | |
ID = r"[^'\"\\\s]+" | |
class IRedisParser(Parser): | |
tokens = IRedisLexer.tokens | |
debugfile = "lex7.out" | |
precedence = ( | |
) | |
""" | |
ending: factors | |
factors : factors factor | |
| factor | |
| factors escapes | |
| escapes | |
escapes : ESCAPE factor | |
| ESCAPE QUOTE | |
| ESCAPE ESCAPE | |
| ESCAPE empty | |
| QUOTE | |
factor : ID | |
| SPACE | |
""" | |
def __init__(self): | |
super().__init__() | |
self.current_quote = None | |
@_("factors") | |
def ending(self, p): | |
result = [] | |
buf = [] | |
stream = iter(p[0]) | |
for item in stream: | |
if item in (" ", None): | |
if buf: | |
result.append("".join(buf)) | |
buf = [] | |
continue | |
if item == QUOTE.START: | |
if not buf: | |
buf += [''] | |
for next_tok in stream: | |
if next_tok == QUOTE.END: | |
break | |
buf.append(next_tok) | |
else: | |
buf.append(item) | |
if buf: | |
result.append("".join(buf)) | |
return result | |
@_("factors factor") | |
def factors(self, p): | |
return p[0] + [p[1]] | |
@_("factor") | |
def factors(self, p): | |
return [p[0]] | |
@_("factors escapes") | |
def factors(self, p): | |
return p[0] + p[1] | |
@_("escapes") | |
def factors(self, p): | |
return p[0] | |
@_("QUOTE") | |
def escapes(self, p): | |
if self.current_quote is None: | |
self.current_quote = p[0] | |
return [QUOTE.START] | |
if self.current_quote == p[0]: | |
self.current_quote = None | |
return [QUOTE.END] | |
return [p[0]] | |
@_("ESCAPE QUOTE") | |
def escapes(self, p): | |
if self.current_quote is None: | |
self.current_quote = p[1] | |
return [p[0], QUOTE.START] | |
return [p[1]] | |
@_("ESCAPE factor") | |
def escapes(self, p): | |
return [p[0] + p[1]] | |
@_("ESCAPE ESCAPE") | |
def escapes(self, p): | |
if self.current_quote is None: | |
return [p[0], p[1]] | |
return [p[1]] | |
@_("ESCAPE empty") | |
def escapes(self, p): | |
return [p[0]] | |
@_("") | |
def empty(self, p): | |
return | |
@_("ID", "SPACE") | |
def factor(self, p): | |
return p[0] | |
def parse_it(text): | |
print("#" * 10 + text + "#" * 10) | |
lexer = IRedisLexer() | |
for tok in lexer.tokenize(text): | |
print("type=%r, value=%r" % (tok.type, tok.value)) | |
parser = IRedisParser() | |
result = parser.parse(lexer.tokenize(text)) | |
print(result) | |
return result | |
@pytest.mark.parametrize( | |
"test_input,expected", | |
[ | |
("hello world", ["hello", "world"]), | |
("hello 'world'", ["hello", "world"]), | |
("'hello world'", ["hello world"]), | |
('''hello"world"''', ["helloworld"]), | |
(r'''hello\"world"''', [r"hello\world"]), | |
('"\\\\"', ["\\"]), | |
("\\\\", ["\\\\"]), | |
("\\", ["\\"]), | |
(r"\abcd ef", [r"\abcd", "ef"]), | |
# quotes in quotes | |
(r""" 'hello"world' """, ['hello"world']), | |
(r""" "hello'world" """, ["hello'world"]), | |
(r""" 'hello\'world'""", ["hello'world"]), | |
(r""" "hello\"world" """, ['hello"world']), | |
(r"''", [""]), # set foo "" is a legal command | |
(r'""', [""]), # set foo "" is a legal command | |
("\\hello\\", ["\\hello\\"]), # blackslash are legal | |
(r"foo ''", ["foo", ""]), # set foo "" is a legal command | |
], | |
) | |
def test_stipe_quote_escaple_in_quote(test_input, expected): | |
assert parse_it(test_input) == expected | |
if __name__ == "__main__": | |
# data = r" set abc 'val\\u \'e' ex 10 nx" | |
# result = parse_it(data) | |
# assert result == ['set', 'abc', "val\\u 'e", 'ex', '10', 'nx'] | |
# print("#" * 20) | |
data = "config set requirepass ''" | |
parse_it(data) | |
# print(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment