-
-
Save arnaudsj/240865 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Jamie Kirkpatrick, November 2009. <[email protected]> | |
# Released under the BSD license. | |
# | |
""" | |
Experimental code to add asyncronous functionality to WSGI applications | |
running under the Tornado webserver. Uses greenlet to spin micro-threads | |
which can be suspended and resumed within a single thread as required. | |
""" | |
from greenlet import greenlet | |
from tornado.wsgi import WSGIContainer | |
from tornado.httpserver import HTTPServer | |
from tornado.ioloop import IOLoop | |
from threading import local | |
thread_data = local() | |
thread_data.tasks = set() | |
class AsyncMixin(object): | |
""" | |
Mixin for WSGI applications that wish to implement async functionality. | |
Subclasses can pass the a bound version of the resume function to some | |
long-running task and have it invoke the callback with it's results. | |
When the callback is invoked the suspended request will resume. | |
""" | |
def async_wait(self, environ): | |
""" | |
Suspends the request until the resume function is called. | |
""" | |
thread_data.tasks.remove(environ["async.task"]) | |
return environ["async.suspend"]() | |
def resume(self, environ, *args, **kwargs): | |
""" | |
Resumes a previously suspended request. | |
""" | |
thread_data.tasks.add(environ["async.task"]) | |
environ["async.resume"](*args, **kwargs) | |
class AsyncWSGIContainer(WSGIContainer): | |
""" | |
WSGI container subclass that adds async funciontality to requests. | |
""" | |
def _environ(self, request): | |
environ = super(AsyncWSGIContainer, self)._environ(request) | |
environ["async.task"] = request.task | |
environ["async.suspend"] = request.suspend | |
environ["async.resume"] = request.resume | |
return environ | |
def __call__(self, request): | |
task = greenlet(super(AsyncWSGIContainer, self).__call__) | |
thread_data.tasks.add(task) | |
request.task = task | |
request.suspend = task.parent.switch | |
request.resume = task.switch | |
task.switch(request) | |
while len(thread_data.tasks): | |
for task in list(thread_data.tasks): | |
if task.dead: | |
thread_data.tasks.remove(task) | |
task.switch() | |
class UpdatesMixin(AsyncMixin): | |
""" | |
An example mixin that shows the ability to push async | |
updates down a long-running request. Based on the example code for | |
the cappuccino-tornado integration | |
See: http://github.com/eliasklughammer/Cappuccino-X-Tornado. | |
""" | |
waiters = [] | |
def wait_for_updates(self, environ): | |
self.waiters.append(lambda *a, **k: self.resume(environ, *a, **k)) | |
return self.async_wait(environ) | |
@classmethod | |
def new_updates(cls, updates): | |
for callback in cls.waiters: | |
callback(updates) | |
cls.waiters = [] | |
class ExampleApplication(UpdatesMixin): | |
""" | |
An example application that either waits asynchronously for updates or | |
posts them to listeners. | |
""" | |
def __call__(self, environ, start_response): | |
if environ["PATH_INFO"] == "/async": | |
updates = self.wait_for_updates(environ) | |
start_response("200", [("Content-type", "text/plain")]) | |
return "Updates: %s" % updates | |
else: | |
start_response("200", [("Content-type", "text/plain")]) | |
self.new_updates("Update arrived") | |
return "Pushed updates" | |
if __name__ == "__main__": | |
app = ExampleApplication() | |
container = AsyncWSGIContainer(app) | |
http_server = HTTPServer(container) | |
http_server.listen(8888) | |
IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment