Skip to content

Instantly share code, notes, and snippets.

@simon-mo
Created October 27, 2020 17:15
Show Gist options
  • Select an option

  • Save simon-mo/623a93168586aa44667264826d6c8f0d to your computer and use it in GitHub Desktop.

Select an option

Save simon-mo/623a93168586aa44667264826d6c8f0d to your computer and use it in GitHub Desktop.
import os
import numpy as np
import ray
import asyncio
from ray.cluster_utils import Cluster
cluster = Cluster()
head_node = cluster.add_node()
child_node = cluster.add_node(resources={"OTHER_NODE": 100})
ray.init(address=head_node.address)
TOTAL_IMAGES = 10
@ray.remote(num_cpus=0, resources={"OTHER_NODE": 1})
class ResultActor:
def __init__(self):
self.results = {}
def predict(self, i, image):
self.results[i] = image
async def get_async_result(self, i):
result = self.results.pop(i, None)
while result is None:
print(f"Awaiting {i}")
result = self.results.pop(i, None)
print(f"Done awaiting {i}")
await asyncio.sleep(1)
print(f"GOT RESULT FOR {i}")
return result
result_actor = ResultActor.remote()
# SEND DATA TO ResultActor
for i in range(TOTAL_IMAGES):
image = (np.random.random((192, 1080, 3)) * 255).astype(np.uint8) # ~ 0.5MB
result_actor.predict.remote(i, image)
# Async get all data
async def get_all_results():
futs = []
fails = 0
for i in range(TOTAL_IMAGES):
fut = result_actor.get_async_result.remote(i)
futs.append(fut)
try:
await asyncio.wait_for(fut, timeout=5)
print(f"SUCCESS {i}")
except:
print(f"FAIL {i}")
fails += 1
# ALTERNATIVE: Await all at the same time
# done, pending = await asyncio.wait(futs, timeout=5)
# if len(pending):
# print(f"FAILED! {int(100.*len(pending)/TOTAL_IMAGES)}% tasks not succeeded!")
print(f"FAIL RATIO: {int(100.*fails/TOTAL_IMAGES)}%")
loop = asyncio.new_event_loop()
loop.run_until_complete(get_all_results())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment