Created
March 9, 2014 16:30
-
-
Save rshk/9450348 to your computer and use it in GitHub Desktop.
Dispatch items from a generator to two consumer, using greenlets to parallelize operations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from greenlet import greenlet | |
##------------------------------------------------------------ | |
## | |
## We have a producer (a simple generator) and two | |
## consumers (functions accepting a generator). | |
## | |
## We want to feed items to the two consumers as soon | |
## as items are generated by the producer. | |
## | |
## We need a workflow like this: | |
## | |
## - get an item from producer | |
## - pass item to consumer1 | |
## - execution is returned to main | |
## - pass item to consumer2 | |
## - execution is returned to main | |
## - keep looping / consuming items | |
## | |
##------------------------------------------------------------ | |
def producer(): | |
for i in xrange(5): | |
yield "Item {0}".format(i) | |
time.sleep(.5) # We're doing things, here.. | |
def consumer1(stream): | |
for item in stream: | |
print("CONSUMER-1: {0}".format(item)) | |
def consumer2(stream): | |
for item in stream: | |
print("CONSUMER-2: {0}".format(item)) | |
##------------------------------------------------------------ | |
## Greenlets machinery | |
##------------------------------------------------------------ | |
class GreenGenerator(object): | |
""" | |
Iterable object, that keep asking parent greenlet | |
for the next item. | |
""" | |
def __iter__(self): | |
return self | |
def __next__(self): | |
g_self = greenlet.getcurrent() | |
next_item = g_self.parent.switch() | |
return next_item | |
next = __next__ # For python 2 | |
def dispatch_stream(stream, *consumers): | |
greenlets = [greenlet(c) for c in consumers] | |
for g in greenlets: | |
g.switch(GreenGenerator()) | |
for item in stream: | |
for g in greenlets: | |
g.switch(item) | |
dispatch_stream(producer(), consumer1, consumer2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment