Skip to content

Instantly share code, notes, and snippets.

@Shougo
Last active March 6, 2018 05:29
Show Gist options
  • Save Shougo/b1292a539dcf7058c5e22b253fd46339 to your computer and use it in GitHub Desktop.
Save Shougo/b1292a539dcf7058c5e22b253fd46339 to your computer and use it in GitHub Desktop.
diff --git a/doc/deoplete.txt b/doc/deoplete.txt
index b726d9e..e8a9eb3 100644
--- a/doc/deoplete.txt
+++ b/doc/deoplete.txt
@@ -64,7 +64,7 @@ You can enable Python3 interface with pip: >
pip3 install neovim
-Note: deoplete needs neovim-python ver.0.1.8+.
+Note: deoplete needs neovim-python ver.0.2.3+.
You need update neovim-python module.
>
pip3 install --upgrade neovim
diff --git a/rplugin/python3/deoplete/deoplete.py b/rplugin/python3/deoplete/deoplete.py
index 6bbbc42..5109be4 100644
--- a/rplugin/python3/deoplete/deoplete.py
+++ b/rplugin/python3/deoplete/deoplete.py
@@ -6,8 +6,7 @@
from deoplete import logger
from deoplete.parent import Parent
-from deoplete.util import (error_tb, find_rplugins)
-# from deoplete.util import error
+from deoplete.util import (error_tb, find_rplugins, error)
class Deoplete(logger.LoggingMixin):
@@ -27,6 +26,10 @@ class Deoplete(logger.LoggingMixin):
self._max_parents = max(
[1, self._vim.vars['deoplete#num_processes']])
+ if self._max_parents > 1 and not hasattr(self._vim, 'loop'):
+ error(self._vim, 'neovim-python 0.2.3+ is required.')
+ return
+
# Enable logging before "Init" for more information, and e.g.
# deoplete-jedi picks up the log filename from deoplete's handler in
# its on_init.
@@ -82,7 +85,8 @@ class Deoplete(logger.LoggingMixin):
# Check the previous completion
prev_candidates = context['vars'][
'deoplete#_prev_completion']['candidates']
- if context['event'] == 'Async' and candidates == prev_candidates:
+ if (context['event'] == 'Async' and
+ not prev_candidates and candidates == prev_candidates):
return
# error(self._vim, candidates)
diff --git a/rplugin/python3/deoplete/parent.py b/rplugin/python3/deoplete/parent.py
index a22b979..2ba08ae 100644
--- a/rplugin/python3/deoplete/parent.py
+++ b/rplugin/python3/deoplete/parent.py
@@ -5,6 +5,11 @@
# ============================================================================
import time
+import os
+import msgpack
+import subprocess
+from functools import partial
+from queue import Queue
from deoplete import logger
from deoplete.process import Process
@@ -17,10 +22,20 @@ class Parent(logger.LoggingMixin):
self.name = 'parent'
self._vim = vim
- self._proc = None
+ self._hnd = None
+ self._stdin = None
self._child = None
self._queue_id = ''
self._prev_pos = []
+ self._queue_in = Queue()
+ self._queue_out = Queue()
+ self._packer = msgpack.Packer(
+ use_bin_type=True,
+ encoding='utf-8',
+ unicode_errors='surrogateescape')
+ self._unpacker = msgpack.Unpacker(
+ encoding='utf-8',
+ unicode_errors='surrogateescape')
self._start_process(context)
def enable_logging(self):
@@ -65,48 +80,65 @@ class Parent(logger.LoggingMixin):
def on_event(self, context):
self._put('on_event', [context])
- if context['event'] == 'VimLeavePre':
- self._stop_process()
def _start_process(self, context):
if self._vim.vars['deoplete#num_processes'] > 1:
# Parallel
- python3 = self._vim.vars.get('python3_host_prog', 'python3')
- self._proc = Process(
- [python3, context['dp_main'],
- self._vim.vars['deoplete#_serveraddr']],
- context, context['cwd'])
+
+ startupinfo = None
+ if os.name == 'nt':
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+
+ self._hnd = self._vim.loop.create_task(
+ self._vim.loop.subprocess_exec(
+ partial(Process, self),
+ self._vim.vars.get('python3_host_prog', 'python3'),
+ context['dp_main'],
+ self._vim.vars['deoplete#_serveraddr'],
+ stderr=None, cwd=context['cwd'], startupinfo=startupinfo))
else:
# Serial
from deoplete.child import Child
self._child = Child(self._vim)
- def _stop_process(self):
- if self._proc:
- self._proc.kill()
- self._proc = None
-
def _put(self, name, args):
queue_id = str(time.time())
- if self._proc:
+ if self._child:
+ return self._child.main(name, args, queue_id)
+
+ if not self._hnd:
+ return None
+
+ msg = self._packer.pack({
+ 'name': name, 'args': args, 'queue_id': queue_id
+ })
+ self._queue_in.put(msg)
+
+ if self._stdin:
try:
- self._proc.write({
- 'name': name, 'args': args, 'queue_id': queue_id
- })
+ while not self._queue_in.empty():
+ self._stdin.write(self._queue_in.get_nowait())
except BrokenPipeError as e:
error_tb(self._vim, 'Crash in child process')
error(self._vim, 'stderr=' + str(self._proc.read_error()))
- self._proc.kill()
- return queue_id
- elif self._child:
- return self._child.main(name, args, queue_id)
- else:
- return None
+ self._hnd = None
+ return queue_id
def _get(self, queue_id):
- if not self._proc:
+ if not self._hnd:
return []
- return [x for x in self._proc.communicate(0.02)
- if x['queue_id'] == queue_id]
+ outs = []
+ while not self._queue_out.empty():
+ outs.append(self._queue_out.get_nowait())
+ return [x for x in outs if x['queue_id'] == queue_id]
+
+ def _on_connection(self, transport):
+ self._stdin = transport.get_pipe_transport(0)
+
+ def _on_output(self, fd, data):
+ self._unpacker.feed(data)
+ for child_out in self._unpacker:
+ self._queue_out.put(child_out)
diff --git a/rplugin/python3/deoplete/process.py b/rplugin/python3/deoplete/process.py
index c85e3d4..54ac3e7 100644
--- a/rplugin/python3/deoplete/process.py
+++ b/rplugin/python3/deoplete/process.py
@@ -4,80 +4,20 @@
# License: MIT license
# ============================================================================
-import subprocess
-import os
-import msgpack
-from threading import Thread
-from queue import Queue
-from time import time, sleep
+import asyncio
-class Process(object):
- def __init__(self, commands, context, cwd):
- startupinfo = None
- if os.name == 'nt':
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- self._proc = subprocess.Popen(commands,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- startupinfo=startupinfo,
- cwd=cwd)
- self._context = context
- self._packer = msgpack.Packer(
- use_bin_type=True,
- encoding='utf-8',
- unicode_errors='surrogateescape')
- self._unpacker = msgpack.Unpacker(
- encoding='utf-8',
- unicode_errors='surrogateescape')
- self._queue_out = Queue()
- self._thread = Thread(target=self.enqueue_output)
- self._thread.start()
+class Process(asyncio.SubprocessProtocol):
- def kill(self):
- if not self._proc:
- return
- self._proc.kill()
- self._proc.wait()
- self._proc = None
- self._queue_out = None
- self._thread.join(1.0)
- self._thread = None
+ def __init__(self, plugin):
+ self._plugin = plugin
+ self._vim = plugin._vim
- def enqueue_output(self):
- while self._proc and self._proc.stdout:
- b = self._proc.stdout.raw.read(102400)
- if b is None:
- continue
- if b == b'':
- # EOF
- break
- self._unpacker.feed(b)
- for child_out in self._unpacker:
- self._queue_out.put(child_out)
+ def connection_made(self, transport):
+ self._vim.async_call(self._plugin._on_connection, transport)
- def communicate(self, timeout):
- if not self._proc:
- return []
+ def pipe_data_received(self, fd, data):
+ self._vim.async_call(self._plugin._on_output, fd, data)
- end = time() + timeout
- while self._queue_out.empty() and time() < end:
- sleep(0.005)
-
- outs = []
- while not self._queue_out.empty():
- outs.append(self._queue_out.get_nowait())
- return outs
-
- def read_error(self):
- if not self._proc or not self._proc.stderr:
- return ''
- return self._proc.stderr.read()
-
- def write(self, expr):
- if not self._proc or not self._proc.stdin:
- return
- self._proc.stdin.write(self._packer.pack(expr))
- self._proc.stdin.flush()
+ def process_exited(self):
+ pass
[1, self._vim.vars['deoplete#num_processes']])
+ if self._max_parents > 1 and not hasattr(self._vim, 'loop'):
+ error(self._vim, 'neovim-python 0.2.3+ is required.')
+ return
+
# Enable logging before "Init" for more information, and e.g.
# deoplete-jedi picks up the log filename from deoplete's handler in
# its on_init.
@@ -82,7 +85,8 @@ class Deoplete(logger.LoggingMixin):
# Check the previous completion
prev_candidates = context['vars'][
'deoplete#_prev_completion']['candidates']
- if context['event'] == 'Async' and candidates == prev_candidates:
+ if (context['event'] == 'Async' and
+ not prev_candidates and candidates == prev_candidates):
return
# error(self._vim, candidates)
diff --git a/rplugin/python3/deoplete/parent.py b/rplugin/python3/deoplete/parent.py
index a22b979..2ba08ae 100644
--- a/rplugin/python3/deoplete/parent.py
+++ b/rplugin/python3/deoplete/parent.py
@@ -5,6 +5,11 @@
# ============================================================================
import time
+import os
+import msgpack
+import subprocess
+from functools import partial
+from queue import Queue
from deoplete import logger
from deoplete.process import Process
@@ -17,10 +22,20 @@ class Parent(logger.LoggingMixin):
self.name = 'parent'
self._vim = vim
- self._proc = None
+ self._hnd = None
+ self._stdin = None
self._child = None
self._queue_id = ''
self._prev_pos = []
+ self._queue_in = Queue()
+ self._queue_out = Queue()
+ self._packer = msgpack.Packer(
+ use_bin_type=True,
+ encoding='utf-8',
+ unicode_errors='surrogateescape')
+ self._unpacker = msgpack.Unpacker(
+ encoding='utf-8',
+ unicode_errors='surrogateescape')
self._start_process(context)
def enable_logging(self):
@@ -65,48 +80,65 @@ class Parent(logger.LoggingMixin):
def on_event(self, context):
self._put('on_event', [context])
- if context['event'] == 'VimLeavePre':
- self._stop_process()
def _start_process(self, context):
if self._vim.vars['deoplete#num_processes'] > 1:
# Parallel
- python3 = self._vim.vars.get('python3_host_prog', 'python3')
- self._proc = Process(
- [python3, context['dp_main'],
- self._vim.vars['deoplete#_serveraddr']],
- context, context['cwd'])
+
+ startupinfo = None
+ if os.name == 'nt':
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+
+ self._hnd = self._vim.loop.create_task(
+ self._vim.loop.subprocess_exec(
+ partial(Process, self),
+ self._vim.vars.get('python3_host_prog', 'python3'),
+ context['dp_main'],
+ self._vim.vars['deoplete#_serveraddr'],
+ stderr=None, cwd=context['cwd'], startupinfo=startupinfo))
else:
# Serial
from deoplete.child import Child
self._child = Child(self._vim)
- def _stop_process(self):
- if self._proc:
- self._proc.kill()
- self._proc = None
-
def _put(self, name, args):
queue_id = str(time.time())
- if self._proc:
+ if self._child:
+ return self._child.main(name, args, queue_id)
+
+ if not self._hnd:
+ return None
+
+ msg = self._packer.pack({
+ 'name': name, 'args': args, 'queue_id': queue_id
+ })
+ self._queue_in.put(msg)
+
+ if self._stdin:
try:
- self._proc.write({
- 'name': name, 'args': args, 'queue_id': queue_id
- })
+ while not self._queue_in.empty():
+ self._stdin.write(self._queue_in.get_nowait())
except BrokenPipeError as e:
error_tb(self._vim, 'Crash in child process')
error(self._vim, 'stderr=' + str(self._proc.read_error()))
- self._proc.kill()
- return queue_id
- elif self._child:
- return self._child.main(name, args, queue_id)
- else:
- return None
+ self._hnd = None
+ return queue_id
def _get(self, queue_id):
- if not self._proc:
+ if not self._hnd:
return []
- return [x for x in self._proc.communicate(0.02)
- if x['queue_id'] == queue_id]
+ outs = []
+ while not self._queue_out.empty():
+ outs.append(self._queue_out.get_nowait())
+ return [x for x in outs if x['queue_id'] == queue_id]
+
+ def _on_connection(self, transport):
+ self._stdin = transport.get_pipe_transport(0)
+
+ def _on_output(self, fd, data):
+ self._unpacker.feed(data)
+ for child_out in self._unpacker:
+ self._queue_out.put(child_out)
diff --git a/rplugin/python3/deoplete/process.py b/rplugin/python3/deoplete/process.py
index c85e3d4..54ac3e7 100644
--- a/rplugin/python3/deoplete/process.py
+++ b/rplugin/python3/deoplete/process.py
@@ -4,80 +4,20 @@
# License: MIT license
# ============================================================================
-import subprocess
-import os
-import msgpack
-from threading import Thread
-from queue import Queue
-from time import time, sleep
+import asyncio
-class Process(object):
- def __init__(self, commands, context, cwd):
- startupinfo = None
- if os.name == 'nt':
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- self._proc = subprocess.Popen(commands,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- startupinfo=startupinfo,
- cwd=cwd)
- self._context = context
- self._packer = msgpack.Packer(
- use_bin_type=True,
- encoding='utf-8',
- unicode_errors='surrogateescape')
- self._unpacker = msgpack.Unpacker(
- encoding='utf-8',
- unicode_errors='surrogateescape')
- self._queue_out = Queue()
- self._thread = Thread(target=self.enqueue_output)
- self._thread.start()
+class Process(asyncio.SubprocessProtocol):
- def kill(self):
- if not self._proc:
- return
- self._proc.kill()
- self._proc.wait()
- self._proc = None
- self._queue_out = None
- self._thread.join(1.0)
- self._thread = None
+ def __init__(self, plugin):
+ self._plugin = plugin
+ self._vim = plugin._vim
- def enqueue_output(self):
- while self._proc and self._proc.stdout:
- b = self._proc.stdout.raw.read(102400)
- if b is None:
- continue
- if b == b'':
- # EOF
- break
- self._unpacker.feed(b)
- for child_out in self._unpacker:
- self._queue_out.put(child_out)
+ def connection_made(self, transport):
+ self._vim.async_call(self._plugin._on_connection, transport)
- def communicate(self, timeout):
- if not self._proc:
- return []
+ def pipe_data_received(self, fd, data):
+ self._vim.async_call(self._plugin._on_output, fd, data)
- end = time() + timeout
- while self._queue_out.empty() and time() < end:
- sleep(0.005)
-
- outs = []
- while not self._queue_out.empty():
- outs.append(self._queue_out.get_nowait())
- return outs
-
- def read_error(self):
- if not self._proc or not self._proc.stderr:
- return ''
- return self._proc.stderr.read()
-
- def write(self, expr):
- if not self._proc or not self._proc.stdin:
- return
- self._proc.stdin.write(self._packer.pack(expr))
- self._proc.stdin.flush()
+ def process_exited(self):
+ pass
[1, self._vim.vars['deoplete#num_processes']])
+ if self._max_parents > 1 and not hasattr(self._vim, 'loop'):
+ error(self._vim, 'neovim-python 0.2.3+ is required.')
+ return
+
# Enable logging before "Init" for more information, and e.g.
# deoplete-jedi picks up the log filename from deoplete's handler in
# its on_init.
@@ -82,7 +85,8 @@ class Deoplete(logger.LoggingMixin):
# Check the previous completion
prev_candidates = context['vars'][
'deoplete#_prev_completion']['candidates']
- if context['event'] == 'Async' and candidates == prev_candidates:
+ if (context['event'] == 'Async' and
+ not prev_candidates and candidates == prev_candidates):
return
# error(self._vim, candidates)
diff --git a/rplugin/python3/deoplete/parent.py b/rplugin/python3/deoplete/parent.py
index a22b979..159676c 100644
--- a/rplugin/python3/deoplete/parent.py
+++ b/rplugin/python3/deoplete/parent.py
@@ -5,6 +5,11 @@
# ============================================================================
import time
+import os
+import msgpack
+import subprocess
+from functools import partial
+from queue import Queue
from deoplete import logger
from deoplete.process import Process
@@ -17,10 +22,20 @@ class Parent(logger.LoggingMixin):
self.name = 'parent'
self._vim = vim
- self._proc = None
+ self._hnd = None
+ self._stdin = None
self._child = None
self._queue_id = ''
self._prev_pos = []
+ self._queue_in = Queue()
+ self._queue_out = Queue()
+ self._packer = msgpack.Packer(
+ use_bin_type=True,
+ encoding='utf-8',
+ unicode_errors='surrogateescape')
+ self._unpacker = msgpack.Unpacker(
+ encoding='utf-8',
+ unicode_errors='surrogateescape')
self._start_process(context)
def enable_logging(self):
@@ -65,48 +80,64 @@ class Parent(logger.LoggingMixin):
def on_event(self, context):
self._put('on_event', [context])
- if context['event'] == 'VimLeavePre':
- self._stop_process()
def _start_process(self, context):
if self._vim.vars['deoplete#num_processes'] > 1:
# Parallel
python3 = self._vim.vars.get('python3_host_prog', 'python3')
- self._proc = Process(
- [python3, context['dp_main'],
- self._vim.vars['deoplete#_serveraddr']],
- context, context['cwd'])
+
+ startupinfo = None
+ if os.name == 'nt':
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ create = self._vim.loop.subprocess_exec(
+ partial(Process, self),
+ python3, context['dp_main'],
+ self._vim.vars['deoplete#_serveraddr'],
+ stderr=None, cwd=context['cwd'], startupinfo=startupinfo)
+ self._hnd = self._vim.loop.create_task(create)
else:
# Serial
from deoplete.child import Child
self._child = Child(self._vim)
- def _stop_process(self):
- if self._proc:
- self._proc.kill()
- self._proc = None
-
def _put(self, name, args):
queue_id = str(time.time())
- if self._proc:
+ if self._child:
+ return self._child.main(name, args, queue_id)
+
+ if not self._hnd:
+ return None
+
+ msg = self._packer.pack({
+ 'name': name, 'args': args, 'queue_id': queue_id
+ })
+ self._queue_in.put(msg)
+
+ if self._stdin:
try:
- self._proc.write({
- 'name': name, 'args': args, 'queue_id': queue_id
- })
+ while not self._queue_in.empty():
+ self._stdin.write(self._queue_in.get_nowait())
except BrokenPipeError as e:
error_tb(self._vim, 'Crash in child process')
error(self._vim, 'stderr=' + str(self._proc.read_error()))
- self._proc.kill()
- return queue_id
- elif self._child:
- return self._child.main(name, args, queue_id)
- else:
- return None
+ self._hnd = None
+ return queue_id
def _get(self, queue_id):
- if not self._proc:
+ if not self._hnd:
return []
- return [x for x in self._proc.communicate(0.02)
- if x['queue_id'] == queue_id]
+ outs = []
+ while not self._queue_out.empty():
+ outs.append(self._queue_out.get_nowait())
+ return [x for x in outs if x['queue_id'] == queue_id]
+
+ def _on_connection(self, transport):
+ self._stdin = transport.get_pipe_transport(0)
+
+ def _on_output(self, fd, data):
+ self._unpacker.feed(data)
+ for child_out in self._unpacker:
+ self._queue_out.put(child_out)
diff --git a/rplugin/python3/deoplete/process.py b/rplugin/python3/deoplete/process.py
index c85e3d4..d111d73 100644
--- a/rplugin/python3/deoplete/process.py
+++ b/rplugin/python3/deoplete/process.py
@@ -4,80 +4,23 @@
# License: MIT license
# ============================================================================
-import subprocess
-import os
-import msgpack
-from threading import Thread
-from queue import Queue
-from time import time, sleep
+import asyncio
-class Process(object):
- def __init__(self, commands, context, cwd):
- startupinfo = None
- if os.name == 'nt':
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- self._proc = subprocess.Popen(commands,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- startupinfo=startupinfo,
- cwd=cwd)
- self._context = context
- self._packer = msgpack.Packer(
- use_bin_type=True,
- encoding='utf-8',
- unicode_errors='surrogateescape')
- self._unpacker = msgpack.Unpacker(
- encoding='utf-8',
- unicode_errors='surrogateescape')
- self._queue_out = Queue()
- self._thread = Thread(target=self.enqueue_output)
- self._thread.start()
+class Process(asyncio.SubprocessProtocol):
- def kill(self):
- if not self._proc:
- return
- self._proc.kill()
- self._proc.wait()
- self._proc = None
- self._queue_out = None
- self._thread.join(1.0)
- self._thread = None
+ def __init__(self, plugin):
+ self._plugin = plugin
+ self._vim = plugin._vim
- def enqueue_output(self):
- while self._proc and self._proc.stdout:
- b = self._proc.stdout.raw.read(102400)
- if b is None:
- continue
- if b == b'':
- # EOF
- break
- self._unpacker.feed(b)
- for child_out in self._unpacker:
- self._queue_out.put(child_out)
+ def connection_made(self, transport):
+ self._vim.async_call(self._plugin._on_connection, transport)
+ self._transport = transport
+ if isinstance(transport, asyncio.SubprocessTransport):
+ self._transport = transport.get_pipe_transport(0)
- def communicate(self, timeout):
- if not self._proc:
- return []
+ def pipe_data_received(self, fd, data):
+ self._vim.async_call(self._plugin._on_output, fd, data)
- end = time() + timeout
- while self._queue_out.empty() and time() < end:
- sleep(0.005)
-
- outs = []
- while not self._queue_out.empty():
- outs.append(self._queue_out.get_nowait())
- return outs
-
- def read_error(self):
- if not self._proc or not self._proc.stderr:
- return ''
- return self._proc.stderr.read()
-
- def write(self, expr):
- if not self._proc or not self._proc.stdin:
- return
- self._proc.stdin.write(self._packer.pack(expr))
- self._proc.stdin.flush()
+ def process_exited(self):
+ pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment