Skip to content

Instantly share code, notes, and snippets.

@ultrafunkamsterdam
Created March 4, 2020 15:56
Show Gist options
  • Save ultrafunkamsterdam/5f9cce4b127df1a31d109d3cbd015e90 to your computer and use it in GitHub Desktop.
Save ultrafunkamsterdam/5f9cce4b127df1a31d109d3cbd015e90 to your computer and use it in GitHub Desktop.
asyncio loop in multiprocessing process
import multiprocessing as mp
import queue
import asyncio
class Process(mp.Process):
def __init__(self, in_queue, out_queue, func, func_args=None, func_kwargs=None):
self.in_queue:mp.Queue = in_queue
self.out_queue:mp.Queue = out_queue
self.loop = None
self._func = func
self._func_args = func_args
self._func_kwargs= func_kwargs
super().__init__()
async def get_task_routine(self):
while True:
try:
print(f'qsize: {self.in_queue.qsize()}')
item = self.in_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(1)
continue
except asyncio.CancelledError as e:
return
if item is None:
print('item is None')
print('exiting', self.name)
break
try:
result = self._func(item, *self._func_args, **self._func_kwargs)
except Exception as e:
self.out_queue.put({'status': 'failed', 'result': None, 'exception': e})
else:
self.out_queue.put({'status': 'success', 'result': result})
await asyncio.sleep(.2)
self.loop.stop()
def put_item(self, item):
self.out_queue.put(item)
self.out_queue.task_done()
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
task = self.loop.create_task(self.get_task_routine())
self.loop.run_forever()
def customfunc(item, *a, **k):
return [item, a, k]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
inq = mp.Queue()
outq = mp.Queue()
p = Process(inq, outq, customfunc, func_args=(1, 2), func_kwargs={'kw1': 1, 'kw2': 2})
p.start()
import random
async def taskputter():
while True:
await asyncio.sleep(random.uniform(.1,1))
inq.put(random.getrandbits(16))
async def taskgetter():
while True:
await asyncio.sleep(.1)
try:
item = outq.get_nowait()
print(item)
except queue.Empty:
continue
coro = asyncio.wait((taskputter(), taskgetter()))
loop.run_until_complete(coro)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment