Skip to content

Instantly share code, notes, and snippets.

@crhan
Created January 30, 2020 05:35
Show Gist options
  • Save crhan/4a57403c61dcb859c7d271620a9b3954 to your computer and use it in GitHub Desktop.
Save crhan/4a57403c61dcb859c7d271620a9b3954 to your computer and use it in GitHub Desktop.
import re
from iredis.commands_csv_loader import all_commands
from iredis.exceptions import InvalidArguments
from iredis.utils import split_command_args
from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import CompleteEvent, Completer, WordCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory
from pygments import lexer
from pygments.filter import apply_filters
from pygments.token import Keyword, Name, String, Text
from pygments.util import guess_decode, text_type
def innerstring_rules(ttype):
return [
# the old style '%s' % (...) string formatting (still valid in Py3)
(
r"%(\(\w+\))?[-#0 +]*([0-9]+|[*])?(\.([0-9]+|[*]))?"
"[hlL]?[E-GXc-giorsaux%]",
String.Interpol,
),
# the new style '{}'.format(...) string formatting
(
r"\{"
r"((\w+)((\.\w+)|(\[[^\]]+\]))*)?" # field name
r"(\![sra])?" # conversion
r"(\:(.?[<>=\^])?[-+ ]?#?0?(\d+)?,?(\.\d+)?[E-GXb-gnosx%]?)?"
r"\}",
String.Interpol,
),
# backslashes, quotes and formatting signs must be parsed one at a time
(r'[^\\\'"%{\n]+', ttype),
(r'[\'"\\]', ttype),
# unhandled string formatting sign
(r"%|(\{{1,2})", ttype)
# newlines are an error (use "nl" state)
]
class IRedisContext(lexer.LexerContext):
def __init__(self, text, pos=0, stack=None, end=None):
super().__init__(text, pos, stack=stack, end=end)
self.completer: Completer = None
class IRedisCommandBaseCompleter(Completer):
def __init__(self):
self.possible_completion = []
def get_completions(self, document: Document, complete_event: CompleteEvent):
pass
class GetCommandCompleter(IRedisCommandBaseCompleter):
def __init__(self):
super().__init__()
def get_completions(self, document: Document, complete_event: CompleteEvent):
_lexer = IRedisExtendedLexer(ensurenl=False, stripnl=False)
_ctx = IRedisContext(_lexer.text_pre_process(document.text))
tokens = list(_lexer.get_tokens_unprocessed(context=_ctx))
word = document.get_word_before_cursor()
print("yeah~, in the get command completer")
print(tokens)
REDIS_COMMANDS = ("GET", "SET")
class IRedisCompleter(Completer):
def __init__(self):
self._command_completer = WordCompleter(REDIS_COMMANDS, ignore_case=True)
def get_completer(self, document: Document):
try:
_command, args = split_command_args(document.text, all_commands)
if _command == document.get_word_before_cursor():
return self._command_completer
if _command.upper() == "GET":
return GetCommandCompleter()
except InvalidArguments:
return self._command_completer
def get_completions(self, document: Document, complete_event: CompleteEvent):
return self.get_completer(document).get_completions(document, complete_event)
def get_command_callback(lexer: "IRedisExtendedLexer", match, ctx: IRedisContext):
ctx.completer = WordCompleter(("ab", "bb"))
ctx.stack.append("get_command")
yield match.start(), Keyword, match[0]
ctx.pos = match.end()
class IRedisLexer(lexer.RegexLexer):
tokens = {
"root": [
(r"\n", Text),
(r"[^\S\n]+", Text),
(r"\\\n", Text),
(r"\\", Text),
(r"(?i)get\b", get_command_callback),
lexer.include("string"),
],
"string": [
(r"'", String.Single, "sqs"),
(r'"', String.Double, "dqs"),
(r"[a-zA-Z_]\w*", String),
],
"strings-single": innerstring_rules(String.Single),
"strings-double": innerstring_rules(String.Double),
"dqs": [
(r'"', String.Double, "#pop"),
(r'\\\\|\\"|\\\n', String.Escape),
lexer.include("strings-double"),
],
"sqs": [
(r"'", String.Single, "#pop"),
(r"\\\\|\\'|\\\n", String.Escape),
lexer.include("strings-single"),
],
"get_command": [(r"\s+", Text), lexer.include("string")],
}
class IRedisExtendedLexer(lexer.ExtendedRegexLexer, IRedisLexer):
def text_pre_process(self, text):
"""
Return an iterable of (tokentype, value) pairs generated from
`text`. If `unfiltered` is set to `True`, the filtering mechanism
is bypassed even if filters are defined.
Also preprocess the text, i.e. expand tabs and strip it if
wanted and applies registered filters.
"""
if not isinstance(text, text_type):
if self.encoding == "guess":
text, _ = guess_decode(text)
elif self.encoding == "chardet":
try:
import chardet
except ImportError:
raise ImportError(
"To enable chardet encoding guessing, "
"please install the chardet library "
"from http://chardet.feedparser.org/"
)
# check for BOM first
decoded = None
for bom, encoding in lexer._encoding_map:
if text.startswith(bom):
decoded = text[len(bom) :].decode(encoding, "replace")
break
# no BOM found, so use chardet
if decoded is None:
enc = chardet.detect(text[:1024]) # Guess using first 1KB
decoded = text.decode(enc.get("encoding") or "utf-8", "replace")
text = decoded
else:
text = text.decode(self.encoding)
if text.startswith(u"\ufeff"):
text = text[len(u"\ufeff") :]
else:
if text.startswith(u"\ufeff"):
text = text[len(u"\ufeff") :]
# text now *is* a unicode string
text = text.replace("\r\n", "\n")
text = text.replace("\r", "\n")
if self.stripall:
text = text.strip()
elif self.stripnl:
text = text.strip("\n")
if self.tabsize > 0:
text = text.expandtabs(self.tabsize)
if self.ensurenl and not text.endswith("\n"):
text += "\n"
return text
if __name__ == "__main__":
_lexer = IRedisExtendedLexer(ensurenl=False, stripnl=False)
session = PromptSession(
history=FileHistory("/tmp/.test_history"),
auto_suggest=AutoSuggestFromHistory(),
complete_while_typing=True,
completer=IRedisCompleter(),
message="> ",
)
session.prompt()
# _extended_lexer.get_tokens()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment