Created
October 13, 2018 03:46
-
-
Save mayli/2958817789e59d6ce34f9aa71a3cc43a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import Queue | |
import shelve | |
from multiprocessing.pool import ThreadPool | |
import logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:[%(threadName)s] %(' | |
'levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class Pipeline(object): | |
state_lock = threading.Lock() | |
def __init__(self, tasks, processes=1, maxsize=1, state="o.state"): | |
n = len(tasks) | |
self.tasks = tasks | |
self.pools = [ThreadPool(processes) for _ in range(n)] | |
self.out = Queue.Queue() | |
if state: | |
self.state = shelve.open(state) | |
def resume(self): | |
with self.state_lock: | |
for key, items in self.state.items(): | |
for item in items: | |
self.execute(self.tasks[int(key)], item) | |
def state_log(self, i, item): | |
key = str(i) | |
with self.state_lock: | |
items = self.state.get(key, []) | |
self.state[key] = items + [item] | |
def state_done(self, i, item): | |
key = str(i) | |
with self.state_lock: | |
items = self.state.get(key, []) | |
items.remove(item) | |
self.state[key] = items | |
def run(self, items, wait=True): | |
map(lambda item: self.execute(0, item), items) | |
if wait: | |
self.wait() | |
return self.out | |
def wait(self): | |
for i, pool in enumerate(self.pools): | |
print "waiting for pool", i, pool | |
pool.close() | |
pool.join() | |
def execute(self, i, item): | |
task = self.tasks[i] | |
if i == len(self.tasks) - 1: | |
# last, put result to out | |
callback = lambda item_: (self.state_done(i, item), self.out.put(item_)) | |
else: | |
callback = lambda item_: (self.state_done(i, item), self.execute(i + 1, | |
item_)) | |
pool = self.pools[i] | |
self.state_log(i, item) | |
pool.apply_async(task, (item,), callback=callback) | |
import time | |
def sleep(item): | |
logger.debug("processing %s", item) | |
time.sleep(1) | |
return item + "," + str(time.time()) | |
def test(): | |
items = "abcdefg" | |
pipe = Pipeline([sleep, sleep, sleep, sleep]) | |
out = pipe.run(items) | |
print out.queue | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment