Created
January 30, 2020 05:35
-
-
Save crhan/4a57403c61dcb859c7d271620a9b3954 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from iredis.commands_csv_loader import all_commands | |
from iredis.exceptions import InvalidArguments | |
from iredis.utils import split_command_args | |
from prompt_toolkit import PromptSession | |
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory | |
from prompt_toolkit.completion import CompleteEvent, Completer, WordCompleter | |
from prompt_toolkit.document import Document | |
from prompt_toolkit.history import FileHistory | |
from pygments import lexer | |
from pygments.filter import apply_filters | |
from pygments.token import Keyword, Name, String, Text | |
from pygments.util import guess_decode, text_type | |
def innerstring_rules(ttype): | |
return [ | |
# the old style '%s' % (...) string formatting (still valid in Py3) | |
( | |
r"%(\(\w+\))?[-#0 +]*([0-9]+|[*])?(\.([0-9]+|[*]))?" | |
"[hlL]?[E-GXc-giorsaux%]", | |
String.Interpol, | |
), | |
# the new style '{}'.format(...) string formatting | |
( | |
r"\{" | |
r"((\w+)((\.\w+)|(\[[^\]]+\]))*)?" # field name | |
r"(\![sra])?" # conversion | |
r"(\:(.?[<>=\^])?[-+ ]?#?0?(\d+)?,?(\.\d+)?[E-GXb-gnosx%]?)?" | |
r"\}", | |
String.Interpol, | |
), | |
# backslashes, quotes and formatting signs must be parsed one at a time | |
(r'[^\\\'"%{\n]+', ttype), | |
(r'[\'"\\]', ttype), | |
# unhandled string formatting sign | |
(r"%|(\{{1,2})", ttype) | |
# newlines are an error (use "nl" state) | |
] | |
class IRedisContext(lexer.LexerContext): | |
def __init__(self, text, pos=0, stack=None, end=None): | |
super().__init__(text, pos, stack=stack, end=end) | |
self.completer: Completer = None | |
class IRedisCommandBaseCompleter(Completer): | |
def __init__(self): | |
self.possible_completion = [] | |
def get_completions(self, document: Document, complete_event: CompleteEvent): | |
pass | |
class GetCommandCompleter(IRedisCommandBaseCompleter): | |
def __init__(self): | |
super().__init__() | |
def get_completions(self, document: Document, complete_event: CompleteEvent): | |
_lexer = IRedisExtendedLexer(ensurenl=False, stripnl=False) | |
_ctx = IRedisContext(_lexer.text_pre_process(document.text)) | |
tokens = list(_lexer.get_tokens_unprocessed(context=_ctx)) | |
word = document.get_word_before_cursor() | |
print("yeah~, in the get command completer") | |
print(tokens) | |
REDIS_COMMANDS = ("GET", "SET") | |
class IRedisCompleter(Completer): | |
def __init__(self): | |
self._command_completer = WordCompleter(REDIS_COMMANDS, ignore_case=True) | |
def get_completer(self, document: Document): | |
try: | |
_command, args = split_command_args(document.text, all_commands) | |
if _command == document.get_word_before_cursor(): | |
return self._command_completer | |
if _command.upper() == "GET": | |
return GetCommandCompleter() | |
except InvalidArguments: | |
return self._command_completer | |
def get_completions(self, document: Document, complete_event: CompleteEvent): | |
return self.get_completer(document).get_completions(document, complete_event) | |
def get_command_callback(lexer: "IRedisExtendedLexer", match, ctx: IRedisContext): | |
ctx.completer = WordCompleter(("ab", "bb")) | |
ctx.stack.append("get_command") | |
yield match.start(), Keyword, match[0] | |
ctx.pos = match.end() | |
class IRedisLexer(lexer.RegexLexer): | |
tokens = { | |
"root": [ | |
(r"\n", Text), | |
(r"[^\S\n]+", Text), | |
(r"\\\n", Text), | |
(r"\\", Text), | |
(r"(?i)get\b", get_command_callback), | |
lexer.include("string"), | |
], | |
"string": [ | |
(r"'", String.Single, "sqs"), | |
(r'"', String.Double, "dqs"), | |
(r"[a-zA-Z_]\w*", String), | |
], | |
"strings-single": innerstring_rules(String.Single), | |
"strings-double": innerstring_rules(String.Double), | |
"dqs": [ | |
(r'"', String.Double, "#pop"), | |
(r'\\\\|\\"|\\\n', String.Escape), | |
lexer.include("strings-double"), | |
], | |
"sqs": [ | |
(r"'", String.Single, "#pop"), | |
(r"\\\\|\\'|\\\n", String.Escape), | |
lexer.include("strings-single"), | |
], | |
"get_command": [(r"\s+", Text), lexer.include("string")], | |
} | |
class IRedisExtendedLexer(lexer.ExtendedRegexLexer, IRedisLexer): | |
def text_pre_process(self, text): | |
""" | |
Return an iterable of (tokentype, value) pairs generated from | |
`text`. If `unfiltered` is set to `True`, the filtering mechanism | |
is bypassed even if filters are defined. | |
Also preprocess the text, i.e. expand tabs and strip it if | |
wanted and applies registered filters. | |
""" | |
if not isinstance(text, text_type): | |
if self.encoding == "guess": | |
text, _ = guess_decode(text) | |
elif self.encoding == "chardet": | |
try: | |
import chardet | |
except ImportError: | |
raise ImportError( | |
"To enable chardet encoding guessing, " | |
"please install the chardet library " | |
"from http://chardet.feedparser.org/" | |
) | |
# check for BOM first | |
decoded = None | |
for bom, encoding in lexer._encoding_map: | |
if text.startswith(bom): | |
decoded = text[len(bom) :].decode(encoding, "replace") | |
break | |
# no BOM found, so use chardet | |
if decoded is None: | |
enc = chardet.detect(text[:1024]) # Guess using first 1KB | |
decoded = text.decode(enc.get("encoding") or "utf-8", "replace") | |
text = decoded | |
else: | |
text = text.decode(self.encoding) | |
if text.startswith(u"\ufeff"): | |
text = text[len(u"\ufeff") :] | |
else: | |
if text.startswith(u"\ufeff"): | |
text = text[len(u"\ufeff") :] | |
# text now *is* a unicode string | |
text = text.replace("\r\n", "\n") | |
text = text.replace("\r", "\n") | |
if self.stripall: | |
text = text.strip() | |
elif self.stripnl: | |
text = text.strip("\n") | |
if self.tabsize > 0: | |
text = text.expandtabs(self.tabsize) | |
if self.ensurenl and not text.endswith("\n"): | |
text += "\n" | |
return text | |
if __name__ == "__main__": | |
_lexer = IRedisExtendedLexer(ensurenl=False, stripnl=False) | |
session = PromptSession( | |
history=FileHistory("/tmp/.test_history"), | |
auto_suggest=AutoSuggestFromHistory(), | |
complete_while_typing=True, | |
completer=IRedisCompleter(), | |
message="> ", | |
) | |
session.prompt() | |
# _extended_lexer.get_tokens() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment