Skip to content

Instantly share code, notes, and snippets.

@Fluxx
Created July 16, 2013 23:14
Show Gist options
  • Select an option

  • Save Fluxx/6016094 to your computer and use it in GitHub Desktop.

Select an option

Save Fluxx/6016094 to your computer and use it in GitHub Desktop.
import time
from threading import Semaphore
from multiprocessing.pool import ThreadPool
class ThreadSafeCount(object):
def __init__(self):
self.mutex = Semaphore(1)
self.count = 0
def incr(self):
with self.mutex:
self.count += 1
return self.count
def harvest(pool_size, func, iterable, max_failures):
pool = ThreadPool(pool_size)
mapping = pool.imap_unordered(func, iterable)
seen_failures = ThreadSafeCount()
results = []
while seen_failures.count <= max_failures:
try:
results.append(next(mapping))
except Exception:
if seen_failures.incr() > max_failures:
print '%s > max %s failures, aborting' % (
seen_failures.count,
max_failures
)
pool.terminate()
except StopIteration:
pool.close()
return results # Success
def sleepy(t):
duration = float(t) / 25
if t in (42,):
print '%s is a failure' % t
raise Exception('boom!')
else:
time.sleep(duration)
print 'slept for %s seconds' % duration
harvest(10, sleepy, range(100), max_failures=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment