Created
January 21, 2021 03:54
-
-
Save CMCDragonkai/3302d31e386cb37f79155e2440665825 to your computer and use it in GitHub Desktop.
Python AsyncIO with Multithreading and Multiprocessing #python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import asyncio | |
import logging | |
import uvloop | |
import aiojobs.aiohttp | |
import sys | |
from aiohttp import web | |
import aioprocessing | |
import concurrent.futures | |
# main is blocking function | |
def main(): | |
logging.basicConfig( | |
level=logging.INFO, | |
format="PID %(process)5s %(threadName)s %(thread)d %(name)s: %(message)s", | |
stream=sys.stderr, | |
) | |
logger = logging.getLogger() | |
logger.info("Server is Started") | |
app = web.Application() | |
async def jobthatgoespastthereqreslifetime(): | |
logger.info("STARTING JOBTHATGOESPASTREQRESLIFETIME") | |
await asyncio.sleep(5) | |
logger.info("ENDING JOBTHATGOESPASTREQRESLIFETIME") | |
def special_process(): | |
logger.info("STARTING SPECIAL PROCESS") | |
time.sleep(5) | |
logger.info("ENDING SPECIAL PROCESS") | |
def special_thread(): | |
logger.info(f"STARTING SPECIAL THREAD ") | |
time.sleep(10) | |
logger.info(f"ENDING SPECIAL THREAD ") | |
def get_loop(): | |
try: | |
loop = asyncio.get_running_loop() | |
print("in get_running_loop") | |
except RuntimeError: | |
loop = asyncio.get_event_loop() | |
print("in get_event_loop") | |
return loop | |
async def ping(request): | |
# asynchronous request | |
# asynchronous response! | |
app = request.app | |
scheduler = app["scheduler"] | |
# first usecase | |
# we are going to spawn and asynchronous job using aiojobs | |
# and we want to see if it is still running in the same process and same thread | |
await scheduler.spawn(jobthatgoespastthereqreslifetime()) | |
# you can also use the utility function spawn | |
# it's the same thing | |
# await spawn(request, jobthatgoespastthereqreslifetime()) | |
# so we know now that its still single process single threaded | |
# the next usecase is where we need single process multithreaded | |
executor = ( | |
concurrent.futures.ThreadPoolExecutor() | |
) | |
loop = get_loop() | |
result = await loop.run_in_executor(executor, special_thread) | |
logger.info("results: {!r}".format(result)) | |
return web.json_response({"pong": None}) | |
# check for cross-cutting concerns | |
aiojobs.aiohttp.setup(app) | |
# this setups up multiprocessing | |
# it's just another process | |
async def start_scheduler(app): | |
app["scheduler"] = await aiojobs.create_scheduler() | |
async def stop_scheduler(app): | |
await app["scheduler"].close() | |
app.on_startup.append(start_scheduler) | |
app.on_shutdown.append(stop_scheduler) | |
async def start_process(app): | |
process = aioprocessing.AioProcess(target=special_process) | |
process.start() | |
app["process"] = process | |
async def stop_process(app): | |
# await for the process to gracefully close | |
await app["process"].coro_join() | |
# REALLY CLOSE IT! | |
app["process"].close() | |
app.on_startup.append(start_process) | |
app.on_cleanup.append(stop_process) | |
router = app.router | |
router.add_route("GET", "/ping", ping) | |
# install the uvloop (overrides the loop) | |
uvloop.install() | |
# runs the app blocking... runs forever | |
web.run_app(app, host="127.0.0.1", port=55555) | |
logger.info("Server is Stopped") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment