Skip to content

Instantly share code, notes, and snippets.

@wilywampa
Created June 25, 2017 17:31
Show Gist options
  • Save wilywampa/20aa2ba284273c9a309d5195adc1d14a to your computer and use it in GitHub Desktop.
Save wilywampa/20aa2ba284273c9a309d5195adc1d14a to your computer and use it in GitHub Desktop.
import ast
import greenlet
import logging
import neovim
import os
import re
from .base import Base
from functools import partial
from jupyter_client import KernelManager
from jupyter_client.consoleapp import JupyterConsoleApp
from jupyter_client.threaded import ThreadedKernelClient
from jupyter_core.application import JupyterApp
imports = re.compile(
r'^\s*(from\s+\.*\w+(\.\w+)*\s+import\s+(\w+,\s+)*|import\s+)')
split_pattern = re.compile('[^= \r\n*()@-]')
keyword = re.compile('[A-Za-z0-9_]')
request = '''
try:
_completions = completion_metadata(get_ipython())
except Exception:
pass
'''
logger = logging.getLogger(__name__)
error, debug, info, warn = (
logger.error, logger.debug, logger.info, logger.warn,)
if 'NVIM_IPY_DEBUG_FILE' in os.environ:
logfile = os.environ['NVIM_IPY_DEBUG_FILE'].strip()
logger.addHandler(logging.FileHandler(logfile, 'w'))
logger.level = logging.DEBUG
class RedirectingKernelManager(KernelManager):
def _launch_kernel(self, cmd, **b):
# stdout is used to communicate with nvim, redirect it somewhere else
self._null = open("/dev/null", "wb", 0)
b['stdout'] = self._null.fileno()
b['stderr'] = self._null.fileno()
return super(RedirectingKernelManager, self)._launch_kernel(cmd, **b)
class JupyterVimApp(JupyterApp, JupyterConsoleApp):
# don't use blocking client; we override call_handlers below
kernel_client_class = ThreadedKernelClient
kernel_manager_class = RedirectingKernelManager
aliases = JupyterConsoleApp.aliases # this the way?
flags = JupyterConsoleApp.flags
def init_kernel_client(self):
# TODO: cleanup this (by subclassing kernel_clint or something)
if self.kernel_manager is not None:
self.kernel_client = self.kernel_manager.client()
else:
self.kernel_client = self.kernel_client_class(
session=self.session,
ip=self.ip,
transport=self.transport,
shell_port=self.shell_port,
iopub_port=self.iopub_port,
hb_port=self.hb_port,
connection_file=self.connection_file,
parent=self,
)
self.kernel_client.shell_channel.call_handlers = self.target.on_shell_msg
self.kernel_client.hb_channel.call_handlers = self.target.on_hb_msg
self.kernel_client.start_channels()
def initialize(self, target, argv):
self.target = target
super(JupyterVimApp, self).initialize(argv)
JupyterConsoleApp.initialize(self, argv)
class Async(object):
"""Wrapper that defers all method calls on a plugin object to the event
loop, given that the object has vim attribute"""
def __init__(self, wraps):
self.wraps = wraps
def __getattr__(self, name):
return partial(self.wraps.vim.async_call, getattr(self.wraps, name))
@neovim.plugin
@neovim.encoding(True)
class IPythonPlugin(object):
def __init__(self, vim):
self.vim = vim
self.has_connection = False
self.pending_shell_msgs = {}
def connect(self, argv):
self.ip_app = JupyterVimApp.instance()
self.ip_app.initialize(Async(self), argv)
self.ip_app.start()
self.kc = self.ip_app.kernel_client
self.km = self.ip_app.kernel_manager
self.has_connection = True
def handle(self, msg_id, handler):
self.pending_shell_msgs[msg_id] = handler
def waitfor(self, msg_id, retval=None):
# FIXME: add some kind of timeout
gr = greenlet.getcurrent()
self.handle(msg_id, gr)
return gr.parent.switch(retval)
def ignore(self, msg_id):
self.handle(msg_id, None)
@neovim.function("IPyConnect", sync=True)
def ipy_connect(self, args):
# 'connect' waits for kernelinfo, and so must be async
Async(self).connect(args)
def on_shell_msg(self, m):
self.last_msg = m
debug('shell %s: %r', m['msg_type'], m['content'])
msg_id = m['parent_header']['msg_id']
try:
handler = self.pending_shell_msgs.pop(msg_id)
except KeyError:
debug('unexpected shell msg: %r', m)
return
if isinstance(handler, greenlet.greenlet):
handler.parent = greenlet.getcurrent()
handler.switch(m)
elif handler is not None:
handler(m)
# this gets called when heartbeat is lost
def on_hb_msg(self, time_since):
raise Exception()
class Source(Base):
def __init__(self, vim):
super().__init__(vim)
self.__plug = IPythonPlugin(vim)
self.__plug.ipy_connect(['--existing'])
self.name = 'ipython'
self.mark = '[IPy]'
self.filetypes = ['python']
self.is_volatile = True
self.input_pattern = '.*'
self.rank = 2000
def get_complete_position(self, context):
line = self.vim.current.line
# return immediately for imports
if imports.match(context['input']):
start = len(context['input'])
while start and re.match('[._A-Za-z0-9]', line[start - 1]):
start -= 1
return start
# locate the start of the word
col = self.vim.funcs.col('.')
start = col - 1
if start == 0 or (len(line) == start and
not split_pattern.match(line[start - 2]) and
not (start >= 2 and keyword.match(line[start - 3]))
and line[start - 3:start - 1] != '].'):
start = -1
return start
line = self.vim.funcs.getline('.')
start = self.vim.funcs.strchars(line[:col]) - 1
bracket_level = 0
while start > 0 and (
split_pattern.match(line[start - 1])
or (line[start - 1] == '.'
and start >= 2 and keyword.match(line[start - 2])
or (line[start - 1] == '-'
and start >= 2 and line[start - 2] == '[')
or ''.join(line[start - 2:start]) == '].')):
if line[start - 1] == '[':
if (start == 1 or not re.match(
'[A-Za-z0-9_\]]', line[start-2])) or bracket_level > 0:
break
bracket_level += 1
elif line[start-1] == ']':
bracket_level -= 1
start -= 1
return start
def gather_candidates(self, context):
plug = self.__plug
plug.waitfor(plug.kc.complete(context['input'], len(context['input'])))
reply = plug.waitfor(plug.kc.execute(
request, silent=True,
user_expressions={'_completions': '_completions'}))
metadata = reply['content']['user_expressions']['_completions']
return ast.literal_eval(metadata['data']['text/plain'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment