Skip to content

Instantly share code, notes, and snippets.

@oxyflour
Created September 23, 2020 15:42
Show Gist options
  • Save oxyflour/840630e63fa2245eed397c26ac1b3a44 to your computer and use it in GitHub Desktop.
Save oxyflour/840630e63fa2245eed397c26ac1b3a44 to your computer and use it in GitHub Desktop.
async exec batch
import asyncio
async def execBatch(next, exec, concurrency=5):
running = [None] * concurrency
torun = await next([], running)
dones = []
async def schedule():
while len(torun) > 0 or len(dones) > 0 or len([task for task in running if task]) > 0:
if len(dones) > 0:
torun.extend(await next(dones, running))
dones.clear()
await asyncio.sleep(2)
running.clear()
async def run(idx):
while len(running) > 0:
if len(torun) > 0:
task = running[idx] = torun.pop()
dones.append(await exec(task, idx))
if len(running) > 0:
running[idx] = None
await asyncio.sleep(1)
futures = [schedule()] + [run(idx) for idx in range(concurrency)]
await asyncio.gather(*futures)
async def main():
dataset = []
async def next(dones, running):
dataset.extend(dones)
if len(dataset) == 0:
return [0, 1, 2, 3, 4, 5]
elif len(dataset) < 10:
return [len(dataset) + 10]
else:
return []
async def exec(task, idx):
await asyncio.sleep(3)
print(task, idx)
return [task, -task]
await execBatch(next, exec)
print(dataset)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment