Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save CMCDragonkai/3302d31e386cb37f79155e2440665825 to your computer and use it in GitHub Desktop.
Save CMCDragonkai/3302d31e386cb37f79155e2440665825 to your computer and use it in GitHub Desktop.
Python AsyncIO with Multithreading and Multiprocessing #python
import time
import asyncio
import logging
import uvloop
import aiojobs.aiohttp
import sys
from aiohttp import web
import aioprocessing
import concurrent.futures
# main is blocking function
def main():
logging.basicConfig(
level=logging.INFO,
format="PID %(process)5s %(threadName)s %(thread)d %(name)s: %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger()
logger.info("Server is Started")
app = web.Application()
async def jobthatgoespastthereqreslifetime():
logger.info("STARTING JOBTHATGOESPASTREQRESLIFETIME")
await asyncio.sleep(5)
logger.info("ENDING JOBTHATGOESPASTREQRESLIFETIME")
def special_process():
logger.info("STARTING SPECIAL PROCESS")
time.sleep(5)
logger.info("ENDING SPECIAL PROCESS")
def special_thread():
logger.info(f"STARTING SPECIAL THREAD ")
time.sleep(10)
logger.info(f"ENDING SPECIAL THREAD ")
def get_loop():
try:
loop = asyncio.get_running_loop()
print("in get_running_loop")
except RuntimeError:
loop = asyncio.get_event_loop()
print("in get_event_loop")
return loop
async def ping(request):
# asynchronous request
# asynchronous response!
app = request.app
scheduler = app["scheduler"]
# first usecase
# we are going to spawn and asynchronous job using aiojobs
# and we want to see if it is still running in the same process and same thread
await scheduler.spawn(jobthatgoespastthereqreslifetime())
# you can also use the utility function spawn
# it's the same thing
# await spawn(request, jobthatgoespastthereqreslifetime())
# so we know now that its still single process single threaded
# the next usecase is where we need single process multithreaded
executor = (
concurrent.futures.ThreadPoolExecutor()
)
loop = get_loop()
result = await loop.run_in_executor(executor, special_thread)
logger.info("results: {!r}".format(result))
return web.json_response({"pong": None})
# check for cross-cutting concerns
aiojobs.aiohttp.setup(app)
# this setups up multiprocessing
# it's just another process
async def start_scheduler(app):
app["scheduler"] = await aiojobs.create_scheduler()
async def stop_scheduler(app):
await app["scheduler"].close()
app.on_startup.append(start_scheduler)
app.on_shutdown.append(stop_scheduler)
async def start_process(app):
process = aioprocessing.AioProcess(target=special_process)
process.start()
app["process"] = process
async def stop_process(app):
# await for the process to gracefully close
await app["process"].coro_join()
# REALLY CLOSE IT!
app["process"].close()
app.on_startup.append(start_process)
app.on_cleanup.append(stop_process)
router = app.router
router.add_route("GET", "/ping", ping)
# install the uvloop (overrides the loop)
uvloop.install()
# runs the app blocking... runs forever
web.run_app(app, host="127.0.0.1", port=55555)
logger.info("Server is Stopped")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment