Created
July 20, 2023 16:38
-
-
Save mooreniemi/ed5c468d41505ea8bc454648fe3d9b84 to your computer and use it in GitHub Desktop.
ray get worker progress and result
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ray | |
import time | |
import logging | |
from threading import Thread | |
log = logging.getLogger(__name__) | |
@ray.remote | |
class Worker: | |
def __init__(self, id, num): | |
self.id = id | |
self.num = num | |
self.progress = 0 | |
self.done = False | |
self.result = None | |
def process(self): | |
thread = Thread(target=self.process_thread) | |
thread.start() | |
def process_thread(self): | |
for i in range(0, self.num): | |
# some codes that consume time, and we use time.sleep() to simulate it | |
time.sleep(1) | |
self.progress = i | |
if self.progress == self.num - 1: | |
self.done = True | |
self.result = f"Finished {self.id}" | |
def get_progress(self): | |
return (self.progress, self.done) | |
def get_result(self): | |
return self.result | |
ray.init() | |
actors = 3 | |
seconds = 10 | |
log.info(f"Here we generate {actors} actors, each runs {seconds} seconds.") | |
workers = [Worker.remote(i, seconds) for i in range(0, actors)] | |
log.info("For each worker, we call the process function to start working") | |
[worker.process.remote() for worker in workers] | |
log.info("All workers started..") | |
log.info("Here we use loop to monitor the progress") | |
unfinished_workers = list(range(0, actors)) | |
while len(unfinished_workers) > 0: | |
for e in unfinished_workers: | |
worker = workers[e] | |
progress, done = ray.get(worker.get_progress.remote()) | |
log.info("Retrieving Progress for {}-{}".format(e, progress)) | |
if done: | |
log.info(f"Finished {e}") | |
unfinished_workers.remove(e) | |
results = ray.get([worker.get_result.remote() for worker in workers]) | |
log.info(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist is an adaptation of this issue's example code. L51 above is of course very brittle, if a worker never finishes this whole process would hang. Would need a time out.