Created
May 4, 2014 05:37
-
-
Save datakurre/b273a6bf9285ee779542 to your computer and use it in GitHub Desktop.
Asynchronous ZPublisher stream iterator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import StringIO | |
import threading | |
from zope.interface import implements | |
from ZPublisher.Iterators import IStreamIterator | |
from ZServer.PubCore.ZEvent import Wakeup | |
from zope.globalrequest import getRequest | |
class zhttp_channel_async_wrapper(object): | |
"""Medusa channel wrapper to defer producers until released""" | |
def __init__(self, channel): | |
# (executed within the current Zope worker thread) | |
self._channel = channel | |
self._mutex = threading.Lock() | |
self._deferred = [] | |
self._released = False | |
self._content_length = 0 | |
def _push(self, producer, send=1): | |
if (isinstance(producer, str) | |
and producer.startswith('HTTP/1.1 200 OK')): | |
# Fix Content-Length to match the real content length | |
# (an alternative would be to use chunked encoding) | |
producer = producer.replace( | |
'Content-Length: 0\r\n', | |
'Content-Length: {0:s}\r\n'.format(str(self._content_length)) | |
) | |
self._channel.push(producer, send) | |
def push(self, producer, send=1): | |
# (executed within the current Zope worker thread) | |
with self._mutex: | |
if not self._released: | |
self._deferred.append((producer, send)) | |
else: | |
self._push(producer, send) | |
def release(self, content_length): | |
# (executed within the exclusive async thread) | |
self._content_length = content_length | |
with self._mutex: | |
for producer, send in self._deferred: | |
self._push(producer, send) | |
self._released = True | |
Wakeup() # wake up the asyncore loop to read our results | |
def __getattr__(self, key): | |
return getattr(self._channel, key) | |
class AsyncWorkerStreamIterator(StringIO.StringIO): | |
"""Stream iterator to publish the results of the given func""" | |
implements(IStreamIterator) | |
def __init__(self, func, response, streamsize=1 << 16): | |
# (executed within the current Zope worker thread) | |
# Init buffer | |
StringIO.StringIO.__init__(self) | |
self._streamsize = streamsize | |
# Wrap the Medusa channel to wait for the func results | |
self._channel = response.stdout._channel | |
self._wrapped_channel = zhttp_channel_async_wrapper(self._channel) | |
response.stdout._channel = self._wrapped_channel | |
# Set content-length as required by ZPublisher | |
response.setHeader('content-length', '0') | |
# Fire the given func in a separate thread | |
self.thread = threading.Thread(target=func, args=(self.callback,)) | |
self.thread.start() | |
def callback(self, data): | |
# (executed within the exclusive async thread) | |
self.write(data) | |
self.seek(0) | |
self._wrapped_channel.release(len(data)) | |
def next(self): | |
# (executed within the main thread) | |
if not self.closed: | |
data = self.read(self._streamsize) | |
if not data: | |
self.close() | |
else: | |
return data | |
raise StopIteration | |
def __len__(self): | |
return len(self.getvalue()) | |
def slow_ok_worker(callback): | |
# (executed within the exclusive async thread) | |
import time | |
time.sleep(1) | |
callback('OK') | |
def slow_ok(): | |
"""The publishable example method""" | |
# (executed within the current Zope worker thread) | |
request = getRequest() | |
return AsyncWorkerStreamIterator(slow_ok_worker, request.response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment