Created
July 11, 2013 22:38
-
-
Save gcorreaq/5979947 to your computer and use it in GitHub Desktop.
Track the progress of a list of AsyncResult objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def track_progress(async_results, logger=None, polling_time=30): | |
total_tasks = remaining_tasks = len(async_results) | |
# We need to iterate until all the tasks are done | |
while async_results and remaining_tasks: | |
ping = time.time() | |
# It's cheaper to add the running tasks in a new list | |
# instead of removing them from a copy of the original list | |
running = [] | |
for result in async_results: | |
# If the worker raises an exception, the call to get() | |
# wil re-raise it | |
try: | |
result.get(0) | |
except multiprocessing.TimeoutError: | |
running.append(result) | |
remaining_tasks = len(running) | |
finished_tasks = (total_tasks - remaining_tasks) | |
# Just if we get a logger, log the progress of the tasks | |
if logger is not None: | |
log_msg = "Tasks -> Total: {0} | Remaining: {1} ({2:.3%}) | Finished: {3} ({4:.3%})" | |
logger.info(log_msg.format( | |
total_tasks, | |
remaining_tasks, | |
(remaining_tasks / total_tasks), | |
finished_tasks, | |
(finished_tasks / total_tasks))) | |
if remaining_tasks <= 0: | |
break | |
else: | |
# Wait until we need to poll again | |
diff = time.time() - ping | |
if diff < polling_time: | |
time.sleep(polling_time - diff) | |
# Here we copy the list | |
async_results = running[:] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment