Skip to content

Instantly share code, notes, and snippets.

@nicfit
Last active September 14, 2017 04:38
Show Gist options
  • Save nicfit/5b4ab43f976c19d098d4c2ba7bcf438d to your computer and use it in GitHub Desktop.
Save nicfit/5b4ab43f976c19d098d4c2ba7bcf438d to your computer and use it in GitHub Desktop.
An example opf how to mix asyncio API in libraries that don't support the syntax
try:
# The following `run_async` function is compiled at runtime
# because it contains syntax which is not supported on older Python
# versions. (A 'return' inside a generator.)
six.exec_(textwrap.dedent('''
def run_async(self, reset_current_buffer=True, pre_run=None):
"""
Same as `run`, but this returns a coroutine.
This is only available on Python >3.3, with asyncio.
"""
# Inline import, because it slows down startup when asyncio is not
# needed.
import asyncio
@asyncio.coroutine
def run():
assert pre_run is None or callable(pre_run)
try:
self._is_running = True
self.on_start.fire()
self.reset()
# Call pre_run.
self._pre_run(pre_run)
with self.input.raw_mode():
self.renderer.request_absolute_cursor_position()
self._redraw()
yield from self.eventloop.run_as_coroutine(
self.input, self.create_eventloop_callbacks())
return self.return_value()
finally:
if not self.is_done:
self._exit_flag = True
self._redraw()
self.renderer.reset()
self.on_stop.fire()
self._is_running = False
return run()
'''))
except SyntaxError:
# Python2, or early versions of Python 3.
def run_async(self, reset_current_buffer=True, pre_run=None):
"""
Same as `run`, but this returns a coroutine.
This is only available on Python >3.3, with asyncio.
"""
raise NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment