Created
August 5, 2014 05:46
-
-
Save masroore/ef9eaa3c5794dfa5f4b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DocumentProcessor(object): | |
WORKER_TASK_SIZE = 100#00 | |
SAVER_CHUNK_LIMIT = 1000 | |
WORKER_POOL_SIZE = 10 | |
def __init__(self, result_file): | |
self.result_file = result_file | |
self.id_cache = [] | |
self.task_queue = Queue() | |
self.result_queue = Queue() | |
self.spawn_workers() | |
self.documents_loaded = 0 | |
def worker(self, name, task_queue, result_queue): | |
while True: | |
task = self.task_queue.get() | |
if task is None: | |
break | |
else: | |
for doc in db.query.find({'_id': {'$in': task}}, {'q': 1}): | |
self.result_queue.put(doc['q']) | |
def saver(self, result_queue, result_file): | |
chunk = [] | |
while True: | |
result = self.result_queue.get() | |
if result is None: | |
break | |
else: | |
self.documents_loaded += 1 | |
chunk.append(result) | |
if len(chunk) >= self.SAVER_CHUNK_LIMIT: | |
self.result_file.write('\n'.join(chunk) + '\n') | |
chunk = [] | |
if chunk: | |
self.result_file.write('\n'.join(chunk) + '\n') | |
chunk = [] | |
def spawn_workers(self): | |
self.worker_pool = [] | |
for x in xrange(self.WORKER_POOL_SIZE): | |
name = 'worker-%d' % x | |
thread = Thread(target=self.worker, args=[name, self.task_queue, self.result_queue]) | |
self.worker_pool.append(thread) | |
thread.start() | |
self.saver_thread = Thread(target=self.saver, args=[self.result_queue, self.result_file]) | |
self.saver_thread.start() | |
def add_doc_id(self, id_): | |
self.id_cache.append(id_) | |
if len(self.id_cache) >= self.WORKER_TASK_SIZE: | |
self.task_queue.put(self.id_cache) | |
self.id_cache = [] | |
def shutdown(self): | |
if self.id_cache: | |
self.task_queue.put(self.id_cache) | |
self.id_cache = [] | |
for x in xrange(len(self.worker_pool)): | |
self.task_queue.put(None) | |
for worker in self.worker_pool: | |
worker.join() | |
self.result_queue.put(None) | |
self.saver_thread.join() | |
self.result_file.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment