Created
June 30, 2017 06:04
-
-
Save wilywampa/ce750b48b85b253b0865887c255ee393 to your computer and use it in GitHub Desktop.
deoplete ipython completion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import greenlet | |
import logging | |
import neovim | |
import os | |
from functools import partial | |
from jupyter_client import KernelManager, find_connection_file | |
logger = logging.getLogger(__name__) | |
error, debug, info, warn = ( | |
logger.error, logger.debug, logger.info, logger.warn,) | |
if 'NVIM_IPY_DEBUG_FILE' in os.environ: | |
logfile = os.environ['NVIM_IPY_DEBUG_FILE'].strip() | |
logger.addHandler(logging.FileHandler(logfile, 'w')) | |
logger.level = logging.DEBUG | |
class RedirectingKernelManager(KernelManager): | |
def _launch_kernel(self, cmd, **b): | |
debug('_launch_kernel') | |
# stdout is used to communicate with nvim, redirect it somewhere else | |
self._null = open("/dev/null", "wb", 0) | |
b['stdout'] = self._null.fileno() | |
b['stderr'] = self._null.fileno() | |
return super(RedirectingKernelManager, self)._launch_kernel(cmd, **b) | |
class Async(object): | |
"""Wrapper that defers all method calls on a plugin object to the event | |
loop, given that the object has vim attribute""" | |
def __init__(self, wraps): | |
self.wraps = wraps | |
def __getattr__(self, name): | |
return partial(self.wraps.vim.async_call, getattr(self.wraps, name)) | |
@neovim.plugin | |
@neovim.encoding(True) | |
class IPythonPlugin(object): | |
def __init__(self, vim): | |
debug('__init__') | |
self.vim = vim | |
@neovim.function("IPyConnect", sync=True) | |
def ipy_connect(self, args): | |
debug('ipy_connect') | |
# 'connect' waits for kernelinfo, and so must be async | |
ipy.vim = self.vim | |
Async(ipy).connect(args) | |
class IPythonInterface(object): | |
def __init__(self): | |
self.has_connection = False | |
self.pending_shell_msgs = {} | |
self.km = None | |
self.kc = None | |
def connect(self, argv): | |
debug('connect') | |
self.km = RedirectingKernelManager( | |
client_class='jupyter_client.threaded.ThreadedKernelClient') | |
self.km.load_connection_file(connection_file=find_connection_file()) | |
self.kc = self.km.client() | |
self.kc.shell_channel.call_handlers = Async(self).on_shell_msg | |
self.kc.hb_channel.call_handlers = Async(self).on_hb_msg | |
self.kc.start_channels() | |
self.has_connection = True | |
def handle(self, msg_id, handler): | |
debug('handle') | |
self.pending_shell_msgs[msg_id] = handler | |
def waitfor(self, msg_id, retval=None): | |
debug('waitfor') | |
# FIXME: add some kind of timeout | |
gr = greenlet.getcurrent() | |
self.handle(msg_id, gr) | |
return gr.parent.switch(retval) | |
def ignore(self, msg_id): | |
debug('ignore') | |
self.handle(msg_id, None) | |
def on_shell_msg(self, m): | |
debug('on_shell_msg') | |
# debug('shell %s: %r', m['msg_type'], m['content']) | |
msg_id = m['parent_header']['msg_id'] | |
try: | |
handler = self.pending_shell_msgs.pop(msg_id) | |
except KeyError: | |
debug('unexpected shell msg: %r', m) | |
return | |
if isinstance(handler, greenlet.greenlet): | |
handler.parent = greenlet.getcurrent() | |
handler.switch(m) | |
elif handler is not None: | |
handler(m) | |
def on_hb_msg(self, time_since): | |
"""this gets called when heartbeat is lost.""" | |
debug('on_hb_msg') | |
self.has_connection = False | |
self.kc.stop_channels() | |
ipy = IPythonInterface() | |
import ast | |
import logging | |
import os | |
import path | |
import re | |
import sys | |
from .base import Base | |
logger = logging.getLogger(__name__) | |
error, debug, info, warn = ( | |
logger.error, logger.debug, logger.info, logger.warn,) | |
if 'NVIM_IPY_DEBUG_FILE' in os.environ: | |
logfile = os.environ['NVIM_IPY_DEBUG_FILE'].strip() | |
logger.addHandler(logging.FileHandler(logfile, 'w')) | |
logger.level = logging.DEBUG | |
sys.path.insert(0, path.Path(__file__).parent.parent.parent) | |
from myipy import ipy | |
imports = re.compile( | |
r'^\s*(from\s+\.*\w+(\.\w+)*\s+import\s+(\w+,\s+)*|import\s+)') | |
split_pattern = re.compile('[^= \r\n*()@-]') | |
keyword = re.compile('[A-Za-z0-9_]') | |
request = ''' | |
try: | |
_completions = completion_metadata(get_ipython()) | |
except Exception: | |
pass | |
''' | |
class Source(Base): | |
def __init__(self, vim): | |
super().__init__(vim) | |
self.name = 'ipython' | |
self.mark = '[IPy]' | |
self.filetypes = ['python'] | |
self.is_volatile = True | |
self.rank = 2000 | |
self.input_pattern = r'\w+(\.\w+)*' | |
def get_complete_position(self, context): | |
debug('get_complete_position') | |
line = self.vim.current.line | |
# return immediately for imports | |
if imports.match(context['input']): | |
start = len(context['input']) | |
while start > 0 and re.match('[._A-Za-z0-9]', line[start - 1]): | |
start -= 1 | |
return start | |
# locate the start of the word | |
col = self.vim.funcs.col('.') | |
start = col - 1 | |
if start == 0 or (len(line) == start and | |
not split_pattern.match(line[start - 2]) and | |
not (start >= 2 and keyword.match(line[start - 3])) | |
and line[start - 3:start - 1] != '].'): | |
start = -1 | |
return start | |
line = self.vim.funcs.getline('.') | |
start = self.vim.funcs.strchars(line[:col]) - 1 | |
bracket_level = 0 | |
while start > 0 and ( | |
split_pattern.match(line[start - 1]) | |
or (line[start - 1] == '.' | |
and start >= 2 and keyword.match(line[start - 2]) | |
or (line[start - 1] == '-' | |
and start >= 2 and line[start - 2] == '[') | |
or ''.join(line[start - 2:start]) == '].')): | |
if line[start - 1] == '[': | |
if (start == 1 or not re.match( | |
'[A-Za-z0-9_\]]', line[start-2])) or bracket_level > 0: | |
break | |
bracket_level += 1 | |
elif line[start-1] == ']': | |
bracket_level -= 1 | |
start -= 1 | |
return start | |
def gather_candidates(self, context): | |
debug(('gather_candidates', context['complete_str'])) | |
if not ipy.has_connection: | |
debug('no connection') | |
return [] | |
ipy.waitfor(ipy.kc.complete(context['complete_str'])) | |
reply = ipy.waitfor(ipy.kc.execute( | |
request, silent=True, | |
user_expressions={'_completions': '_completions'})) | |
metadata = reply['content']['user_expressions']['_completions'] | |
debug(f"matches = {ast.literal_eval(metadata['data']['text/plain'])}") | |
return ast.literal_eval(metadata['data']['text/plain']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment