Skip to content

Instantly share code, notes, and snippets.

@jhw
Last active October 21, 2019 09:50
Show Gist options
  • Save jhw/88e346ed9384c99f31dba6c01b9f29f8 to your computer and use it in GitHub Desktop.
Save jhw/88e346ed9384c99f31dba6c01b9f29f8 to your computer and use it in GitHub Desktop.
Python mapreduce pattern
justin@justin-XPS-13-9360:~/work$ python3 mapreduce.py 
spawning batch 1
result -> {'x': 2, 'y': 6, 'z': 8}
result -> {'x': 6, 'y': 8, 'z': 14}
spawning batch 2
result -> {'x': 1, 'y': 2, 'z': 3}
result -> {'x': 1, 'y': 2, 'z': 3}

- x: 2
  y: 6
  z: 8
- x: 6
  y: 8
  z: 14
- x: 1
  y: 2
  z: 3
- x: 1
  y: 2
  z: 3
"""
- http://skipperkongen.dk/2016/09/09/easy-parallel-http-requests-with-python-and-asyncio/
"""
import asyncio, concurrent.futures, copy, math
def run_parallel(worker, requests, callback, batchsz):
async def main(batch):
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
loop=asyncio.get_event_loop()
futures=[loop.run_in_executor(executor, worker(**kwargs))
for kwargs in batch]
for struct in await asyncio.gather(*futures):
callback(struct)
n=int(math.ceil(len(requests)/batchsz))
loop=asyncio.get_event_loop()
for i in range(n):
print ("spawning batch %i" % (i+1))
batch=requests[i*batchsz:(i+1)*batchsz]
loop.run_until_complete(main(batch))
loop.close()
def run_many(requests, batchsz):
def worker(**params):
def fn():
resp=copy.deepcopy(params)
resp["z"]=params["x"]+params["y"]
return resp
return fn
results=[]
def callback(result):
print ("result -> %s" % result)
results.append(result)
run_parallel(worker, requests, callback, batchsz)
return results
if __name__=="__main__":
import random
random.seed(13)
requests=[{"x": int(10*random.random()),
"y": int(10*random.random())}
for i in range(4)]
response=run_many(requests, batchsz=2)
print ()
import yaml
print (yaml.safe_dump(response,
default_flow_style=False))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment