Created
January 13, 2011 19:08
-
-
Save thepaul/778410 to your computer and use it in GitHub Desktop.
Do Twisted Deferred-producing jobs, X at a time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ParallelBatcher.py | |
# | |
# Job pipeline for Twisted Matrix | |
# the paul 2011 | |
# | |
# Sort of goes between defer.DeferredList and plain Deferred chaining. | |
# When you have lots of jobs to do which take time (most likely because | |
# they have to wait on some network action) but you don't want to do | |
# them all at the same time (maybe the remote network action is CPU- or | |
# bandwidth-intensive and you want to avoid overloading the remote | |
# host(s)), then you can use this to run them in a sort of pipeline | |
# which runs, at most, X jobs at the same time (X is whatever you want). | |
# | |
# If jobs complete immediately, they don't need to return Deferreds. | |
# | |
# Returns results in the same way defer.DeferredList does. | |
from twisted.internet import defer | |
from twisted.python import failure | |
@defer.inlineCallbacks | |
def job_strand(job_iter): | |
results = [] | |
for num, (f, args, kwargs) in job_iter: | |
try: | |
val = yield f(*(args or ()), **(kwargs or {})) | |
except Exception: | |
results.append((num, False, failure.Failure())) | |
else: | |
results.append((num, True, val)) | |
defer.returnValue(results) | |
def collate_output(output): | |
results = [] | |
for succ, strandresults in output: | |
if succ: | |
results.extend(strandresults) | |
return [res[1:3] for res in sorted(results)] | |
@defer.inlineCallbacks | |
def ParallelBatcher(joblist, simultaneous): | |
""" | |
Pass me 3-tuples containing (function, args, keyword_args), and I will | |
call them in parallel ('simultaneous' jobs at a time) and return the | |
results in a list of (success, value) tuples like DeferredList gives. | |
The args and keyword_args elements of the job tuples can be None. | |
""" | |
strands = map(job_strand, [enumerate(joblist)] * simultaneous) | |
return defer.DeferredList(strands).addCallback(collate_output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment