Skip to content

Instantly share code, notes, and snippets.

@rueian
Last active August 23, 2022 14:44
Show Gist options
  • Save rueian/6cf8f6ef1f13bada90ac8ae92088f0ae to your computer and use it in GitHub Desktop.
Save rueian/6cf8f6ef1f13bada90ac8ae92088f0ae to your computer and use it in GitHub Desktop.
import ray
import time
from dataclasses import dataclass
from heapq import heappush, heappop
@dataclass
class Meta:
max: int
rtt: float = 10
size: int = 1
time: float = None
class HeterogeneousActorPool:
def __init__(self):
self.actors = {}
def add(self, actor, max_batch):
if actor not in self.actors:
self.actors[actor] = Meta(max=max_batch)
def map_batches(self, batch_fn, tasks):
if len(self.actors) == 0:
raise Exception("no actors")
pendings = {}
i = 0
def next_batch(i, actor, meta, rtt):
size = min(meta.size*2, meta.max) if rtt < meta.rtt else (meta.size + meta.size//2)//2
j = min(i + size, len(tasks))
meta.size = j - i
if rtt > 0:
meta.rtt = rtt
if meta.size > 0:
meta.time = time.time()
pendings[batch_fn(actor, tasks[i:j])] = (i, actor, meta)
return j
for actor, meta in self.actors.items():
i = next_batch(i, actor, meta, -1)
heap = []
waits = list(pendings)
while waits:
refs, _ = ray.wait(waits, num_returns=1, fetch_local=False)
ref = refs[0]
x, actor, meta = pendings.pop(ref)
heappush(heap, (x, ref))
i = next_batch(i, actor, meta, time.time() - meta.time)
waits = list(pendings)
return [item for sublist in ray.get([heappop(heap)[1] for _ in range(len(heap))]) for item in sublist]
if __name__ == '__main__':
@ray.remote
class MyActor:
def __init__(self, ss):
self.ss = ss
def do(self, batch):
time.sleep(self.ss)
print(self.ss, len(batch))
return batch
pool = HeterogeneousActorPool()
pool.add(MyActor.remote(1), 100)
pool.add(MyActor.remote(5), 50)
pool.add(MyActor.remote(9), 10)
print(pool.map_batches(lambda a, b: a.do.remote(b), [i for i in range(2000)]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment