Skip to content

Instantly share code, notes, and snippets.

@lgray
Last active December 17, 2021 22:49
Show Gist options
  • Select an option

  • Save lgray/fda9236dad8617f040a7bfbdc90d7136 to your computer and use it in GitHub Desktop.

Select an option

Save lgray/fda9236dad8617f040a7bfbdc90d7136 to your computer and use it in GitHub Desktop.
import random
from dask.distributed import Client, worker_client, as_completed, secede, rejoin
import pprint
import time
def reduce_chunks(items):
out = 0
for item in items:
out += item #.result()
return out
def processit(start, stop):
return stop - start
def tree_reduce(client, to_reduce):
reduced = to_reduce
while len(reduced) > 1:
new_flat = []
dflat_len = len(reduced)
new_flat.extend(client.map(reduce_chunks, [reduced[i:min(dflat_len, i + N_reduce)] for i in range(0, dflat_len, N_reduce)]))
reduced = new_flat
return reduced[0]
def chunkit(ilen, chunk_size, N_reduce):
with worker_client() as client:
starts, stops = list(zip(*[[istart, min(ilen, istart + chunksize)] for istart in range(0, ilen, chunksize)]))
chunkmap = client.map(processit, starts, stops)
reduced = tree_reduce(client, chunkmap)
return reduced
random.seed(12345)
items = [[random.randint(90_000, 1_000_000) for _ in range(random.randint(50, 100))] for _ in range(50)]
chunksize = 100_000
N_reduce = 3
if __name__ == "__main__":
client = Client(n_workers=10)
time.sleep(2)
#pprint.pprint(items)
tic_start = time.monotonic()
datasets = []
for item in items:
datasets.append(client.map(chunkit, item, len(item)*[chunksize], len(item)*[N_reduce]))
toc = time.monotonic()
print("submit jobs:", toc - tic_start)
tic = time.monotonic()
for i in range(len(datasets)):
datasets[i] = tree_reduce(client, client.gather(datasets[i]))
toc = time.monotonic()
print("reduce datasets:", toc - tic)
for future, dataset in zip(datasets, items):
item_total = sum(dataset)
dask_total = future.result()
if item_total != dask_total:
raise Exception("bad reduction result!")
print(f"{item_total} == {dask_total}")
toc = time.monotonic()
print("total time:", toc - tic_start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment