Skip to content

Instantly share code, notes, and snippets.

@wilywampa
Created June 30, 2017 06:04
Show Gist options
  • Save wilywampa/ce750b48b85b253b0865887c255ee393 to your computer and use it in GitHub Desktop.
Save wilywampa/ce750b48b85b253b0865887c255ee393 to your computer and use it in GitHub Desktop.
deoplete ipython completion
import greenlet
import logging
import neovim
import os
from functools import partial
from jupyter_client import KernelManager, find_connection_file
logger = logging.getLogger(__name__)
error, debug, info, warn = (
logger.error, logger.debug, logger.info, logger.warn,)
if 'NVIM_IPY_DEBUG_FILE' in os.environ:
logfile = os.environ['NVIM_IPY_DEBUG_FILE'].strip()
logger.addHandler(logging.FileHandler(logfile, 'w'))
logger.level = logging.DEBUG
class RedirectingKernelManager(KernelManager):
def _launch_kernel(self, cmd, **b):
debug('_launch_kernel')
# stdout is used to communicate with nvim, redirect it somewhere else
self._null = open("/dev/null", "wb", 0)
b['stdout'] = self._null.fileno()
b['stderr'] = self._null.fileno()
return super(RedirectingKernelManager, self)._launch_kernel(cmd, **b)
class Async(object):
"""Wrapper that defers all method calls on a plugin object to the event
loop, given that the object has vim attribute"""
def __init__(self, wraps):
self.wraps = wraps
def __getattr__(self, name):
return partial(self.wraps.vim.async_call, getattr(self.wraps, name))
@neovim.plugin
@neovim.encoding(True)
class IPythonPlugin(object):
def __init__(self, vim):
debug('__init__')
self.vim = vim
@neovim.function("IPyConnect", sync=True)
def ipy_connect(self, args):
debug('ipy_connect')
# 'connect' waits for kernelinfo, and so must be async
ipy.vim = self.vim
Async(ipy).connect(args)
class IPythonInterface(object):
def __init__(self):
self.has_connection = False
self.pending_shell_msgs = {}
self.km = None
self.kc = None
def connect(self, argv):
debug('connect')
self.km = RedirectingKernelManager(
client_class='jupyter_client.threaded.ThreadedKernelClient')
self.km.load_connection_file(connection_file=find_connection_file())
self.kc = self.km.client()
self.kc.shell_channel.call_handlers = Async(self).on_shell_msg
self.kc.hb_channel.call_handlers = Async(self).on_hb_msg
self.kc.start_channels()
self.has_connection = True
def handle(self, msg_id, handler):
debug('handle')
self.pending_shell_msgs[msg_id] = handler
def waitfor(self, msg_id, retval=None):
debug('waitfor')
# FIXME: add some kind of timeout
gr = greenlet.getcurrent()
self.handle(msg_id, gr)
return gr.parent.switch(retval)
def ignore(self, msg_id):
debug('ignore')
self.handle(msg_id, None)
def on_shell_msg(self, m):
debug('on_shell_msg')
# debug('shell %s: %r', m['msg_type'], m['content'])
msg_id = m['parent_header']['msg_id']
try:
handler = self.pending_shell_msgs.pop(msg_id)
except KeyError:
debug('unexpected shell msg: %r', m)
return
if isinstance(handler, greenlet.greenlet):
handler.parent = greenlet.getcurrent()
handler.switch(m)
elif handler is not None:
handler(m)
def on_hb_msg(self, time_since):
"""this gets called when heartbeat is lost."""
debug('on_hb_msg')
self.has_connection = False
self.kc.stop_channels()
ipy = IPythonInterface()
import ast
import logging
import os
import path
import re
import sys
from .base import Base
logger = logging.getLogger(__name__)
error, debug, info, warn = (
logger.error, logger.debug, logger.info, logger.warn,)
if 'NVIM_IPY_DEBUG_FILE' in os.environ:
logfile = os.environ['NVIM_IPY_DEBUG_FILE'].strip()
logger.addHandler(logging.FileHandler(logfile, 'w'))
logger.level = logging.DEBUG
sys.path.insert(0, path.Path(__file__).parent.parent.parent)
from myipy import ipy
imports = re.compile(
r'^\s*(from\s+\.*\w+(\.\w+)*\s+import\s+(\w+,\s+)*|import\s+)')
split_pattern = re.compile('[^= \r\n*()@-]')
keyword = re.compile('[A-Za-z0-9_]')
request = '''
try:
_completions = completion_metadata(get_ipython())
except Exception:
pass
'''
class Source(Base):
def __init__(self, vim):
super().__init__(vim)
self.name = 'ipython'
self.mark = '[IPy]'
self.filetypes = ['python']
self.is_volatile = True
self.rank = 2000
self.input_pattern = r'\w+(\.\w+)*'
def get_complete_position(self, context):
debug('get_complete_position')
line = self.vim.current.line
# return immediately for imports
if imports.match(context['input']):
start = len(context['input'])
while start > 0 and re.match('[._A-Za-z0-9]', line[start - 1]):
start -= 1
return start
# locate the start of the word
col = self.vim.funcs.col('.')
start = col - 1
if start == 0 or (len(line) == start and
not split_pattern.match(line[start - 2]) and
not (start >= 2 and keyword.match(line[start - 3]))
and line[start - 3:start - 1] != '].'):
start = -1
return start
line = self.vim.funcs.getline('.')
start = self.vim.funcs.strchars(line[:col]) - 1
bracket_level = 0
while start > 0 and (
split_pattern.match(line[start - 1])
or (line[start - 1] == '.'
and start >= 2 and keyword.match(line[start - 2])
or (line[start - 1] == '-'
and start >= 2 and line[start - 2] == '[')
or ''.join(line[start - 2:start]) == '].')):
if line[start - 1] == '[':
if (start == 1 or not re.match(
'[A-Za-z0-9_\]]', line[start-2])) or bracket_level > 0:
break
bracket_level += 1
elif line[start-1] == ']':
bracket_level -= 1
start -= 1
return start
def gather_candidates(self, context):
debug(('gather_candidates', context['complete_str']))
if not ipy.has_connection:
debug('no connection')
return []
ipy.waitfor(ipy.kc.complete(context['complete_str']))
reply = ipy.waitfor(ipy.kc.execute(
request, silent=True,
user_expressions={'_completions': '_completions'}))
metadata = reply['content']['user_expressions']['_completions']
debug(f"matches = {ast.literal_eval(metadata['data']['text/plain'])}")
return ast.literal_eval(metadata['data']['text/plain'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment