Skip to content

Instantly share code, notes, and snippets.

@ly0
Last active March 20, 2017 20:03
Show Gist options
  • Select an option

  • Save ly0/94dec6346f5be528c6bbd2c25fe311b9 to your computer and use it in GitHub Desktop.

Select an option

Save ly0/94dec6346f5be528c6bbd2c25fe311b9 to your computer and use it in GitHub Desktop.
wait_subtasks.py
# coding=utf-8
"""Wait for all subtasks have done.
"""
import tornado
import tornado.locks
from tornado.concurrent import Future
class ProcessWaiter(tornado.locks._TimeoutGarbageCollector):
def __init__(self):
super(ProcessWaiter, self).__init__()
self._value = 0
def __repr__(self):
res = super(ProcessWaiter, self).__repr__()
extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
self._value)
if self._waiters:
extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
return '<{0} [{1}]>'.format(res[1:-1], extra)
def __enter__(self):
self.processing()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.done()
def processing(self):
"""Increment the counter and wake one waiter."""
self._value -= 1
def done(self):
self._value += 1
if self._value == 0:
# 把所有的waiter都唤醒并且注销
while self._waiters:
waiter = self._waiters.popleft()
#waiter._set_done()
waiter.set_result(tornado.locks._ReleasingContextManager(waiter))
def wait_all_done(self, timeout=None):
waiter = Future()
if self._value == 0:
# 目前没有需要同步的,立刻注销
waiter.set_result(tornado.locks._ReleasingContextManager(self))
elif self._value < 0:
self._waiters.append(waiter)
if timeout:
def on_timeout():
waiter.set_exception(tornado.gen.TimeoutError())
self._garbage_collect()
io_loop = tornado.ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
else:
raise ProcessWaiterSemaphoreException('Semaphore value is positive (val:%d)' % self._value)
return waiter
class ProcessWaiterSemaphoreException(Exception):
pass
@ly0
Copy link
Copy Markdown
Author

ly0 commented Mar 20, 2017

Usage:
Subtask:

process_waiter = ProcessWaiter()
def subtask():
    process_waiter.processing()
    blablabla...
    process_waiter.done()

OR

with process_waiter:
    blablabla

Main Coroutine:

yield process_waiter.wait_all_done()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment