Created
November 29, 2017 06:00
-
-
Save BibMartin/4381442209825c5a3d57f6b4a3c2acca to your computer and use it in GitHub Desktop.
Using multiprocessing to run a blocking function with a timeout.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tornado import ioloop, gen, concurrent | |
import pandas as pd | |
import time | |
import asyncio | |
from multiprocessing import TimeoutError, Process | |
from multiprocessing.dummy import Pool as ThreadPool | |
class F(object): | |
def __init__(self): | |
self.io_loop = ioloop.IOLoop() | |
def run_yield(self): | |
p = ThreadPool(1) | |
res = p.apply_async(self.run) | |
res.wait(3.) | |
if res.ready(): | |
out = res.get() | |
open('foo.log', 'a').write('SUCCESS\n') | |
print('SUCCESS', out) | |
else: | |
open('foo.log', 'a').write('TIMEOUT\n') | |
print('TIMEOUT') | |
p.terminate() | |
def run(self, x=None): | |
for i in range(12): | |
open('foo.log', 'a').write( | |
'{}\n'.format(pd.Timestamp.utcnow())) | |
time.sleep(1) | |
return 'Finished' | |
f = F() | |
open('foo.log', 'a').write("======\n") | |
f.run_yield() | |
# Efforts to run it async:... | |
# =========================== | |
class G(object): | |
def __init__(self): | |
self.io_loop = ioloop.IOLoop() | |
self.executor = concurrent.futures.ProcessPoolExecutor() | |
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | |
#@concurrent.run_on_executor() | |
@gen.coroutine | |
def run_yield(self): | |
self.p = ThreadPool(1) | |
res = self.p.apply_async(self.run) | |
open('foo.log', 'a').write('will wait...\n') | |
yield (asyncio.sleep(3.)) | |
open('foo.log', 'a').write('have waited...\n') | |
# res.wait(3.) | |
if res.ready(): | |
out = res.get() | |
open('foo.log', 'a').write('SUCCESS\n') | |
print('SUCCESS', out) | |
else: | |
open('foo.log', 'a').write('TIMEOUT\n') | |
print('TIMEOUT') | |
self.p.terminate() | |
#self.executor.map(self.run, [1], timeout=3.) | |
#self.run() | |
#p = ThreadPool(1) | |
#res = p.apply_async(self.run) | |
#res.wait(3.) | |
#if res.ready(): | |
# out = res.get(1.) | |
# print('SUCCESS', out) | |
#else: | |
# print('TIMEOUT') | |
#p.terminate() | |
#try: | |
# out = res.get(3.) # Wait timeout seconds for func to complete. | |
# return out | |
#except TimeoutError: | |
# print("Aborting due to timeout") | |
# p.terminate() | |
def run_yield0(self): | |
p = ThreadPool(1) | |
res = p.apply_async(self.run) | |
res.wait(3.) | |
if res.ready(): | |
out = res.get(1.) | |
open('foo.log', 'a').write('SUCCESS\n') | |
print('SUCCESS', out) | |
else: | |
open('foo.log', 'a').write('TIMEOUT\n') | |
print('TIMEOUT') | |
p.terminate() | |
def run(self, x=None): | |
for i in range(12): | |
open('foo.log', 'a').write( | |
'{}\n'.format(pd.Timestamp.utcnow())) | |
time.sleep(1) | |
return 'Finished' | |
# async def foo(self): | |
# while True: | |
# x = await sleep_fun() | |
# open('foo.log', 'a').write('{}\n'.format(x)) | |
def print(self): | |
open('foo.log', 'a').write('PRINT\n') | |
def stop(self): | |
open('foo.log', 'a').write('STOP\n') | |
self.io_loop.stop() | |
# self.executor.shutdown(wait=True) | |
#f.io_loop.run_sync(f.run_yield) | |
#f.io_loop.start() | |
#loop = asyncio.get_event_loop() | |
# Blocking call which returns when the display_date() coroutine is done | |
#loop.run_until_complete(f.run_yield()) | |
#loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment