Created
December 29, 2015 12:33
-
-
Save alex8224/816eeb17f93606089ff1 to your computer and use it in GitHub Desktop.
greenlet in tornado
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf-8 -*- | |
from __future__ import absolute_import | |
import sys | |
import greenlet | |
import socket | |
import time | |
from tornado.iostream import IOStream | |
from tornado.gen import coroutine, Return | |
from tornado.concurrent import Future | |
from tornado.ioloop import IOLoop | |
def synclize(func): | |
coro = coroutine(func) | |
def _sync_call(*args, **kwargs): | |
child_gr = greenlet.getcurrent() | |
main = child_gr.parent | |
def callback(future): | |
if future.exc_info(): | |
child_gr.throw(*future.exc_info()) | |
elif future.exception(): | |
child_gr.throw(future.exception()) | |
else: | |
child_gr.switch(future.result()) | |
IOLoop.instance().add_future(coro(*args, **kwargs), callback) | |
return main.switch() | |
return _sync_call | |
def spawn(callable_obj, *args, **kwargs): | |
future = Future() | |
def inner_call(): | |
try: | |
result = callable_obj(*args, **kwargs) | |
future.set_result(result) | |
except Exception as ex: | |
future.set_exception(ex) | |
greenlet.greenlet(inner_call).switch() | |
return future | |
def greentask(func): | |
def call_func(*args, **kwargs): | |
future = Future() | |
def inner_call(): | |
try: | |
result = func(*args, **kwargs) | |
future.set_result(result) | |
except Exception as ex: | |
future.set_result(ex) | |
greenlet.greenlet(inner_call).switch() | |
return future | |
return call_func | |
class Waiter(object): | |
def __init__(self): | |
self._greenlet = greenlet.getcurrent() | |
self._main = self._greenlet.parent | |
def switch(self, value): | |
self._greenlet.switch(value) | |
def get(self): | |
return self._main.switch() | |
def sleep(seconds): | |
waiter = Waiter() | |
unique = object() | |
IOLoop.current().add_timeout(time.time() + seconds, waiter.switch, unique) | |
waiter.get() | |
class AsyncSocket(object): | |
def __init__(self, sock): | |
self._iostream = IOStream(sock) | |
@synclize | |
def connect(self, address): | |
yield self._iostream.connect(address) | |
@synclize | |
def sendall(self, buff): | |
yield self._iostream.write(buff) | |
@synclize | |
def read(self, nbytes): | |
buff = yield self._iostream.read_bytes(nbytes) | |
raise Return(buff) | |
def close(self): | |
self._iostream.close() | |
def set_nodelay(self, flag): | |
self._iostream.set_nodelay(flag) | |
def async_connect(self, sock=None): | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.socket = AsyncSocket(sock) | |
self.socket.connect((self.host, self.port)) | |
self.socket.set_nodelay(True) | |
self._rfile = self.socket | |
self._get_server_information() | |
self._request_authentication() | |
if self.sql_mode is not None: | |
c = self.cursor() | |
c.execute("SET sql_mode=%s", (self.sql_mode,)) | |
if self.init_command is not None: | |
c = self.cursor() | |
c.execute(self.init_command) | |
c.close() | |
self.commit() | |
if self.autocommit_mode is not None: | |
self.autocommit(self.autocommit_mode) | |
except socket.error: | |
if self.socket: | |
self.socket.close() | |
raise | |
def patch_pymysql(): | |
sys.modules["pymysql"].connections.Connection.connect = async_connect |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment