Skip to content

Instantly share code, notes, and snippets.

@tenuki
Last active February 24, 2024 01:18
Show Gist options
  • Save tenuki/ff67f87cba5c4c04fd08d9c800437477 to your computer and use it in GitHub Desktop.
Save tenuki/ff67f87cba5c4c04fd08d9c800437477 to your computer and use it in GitHub Desktop.
How to run multiple uvicorn server apps in the same process (thanks @a-d-j-i )
##
## How to run multiple uvicorn server apps in the same process
##
import asyncio
from uvicorn import Server, Config
class MyServer(Server):
async def run(self, sockets=None):
self.config.setup_event_loop()
return await self.serve(sockets=sockets)
async def run():
apps = []
for cfg in configList:
config = Config("srcs.myapp:app", host="0.0.0.0",
port=cfg["port"])
server = MyServer(config=config)
apps.append(server.run())
return await asyncio.gather(*apps)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
@SyntaxRules-avant
Copy link

This did it for me, and its interruptible:

import asyncio
import sys

...some fast api app...

async def create_webserver(port):
    server_config = uvicorn.Config("uvicorn_asgi:fast_api_app", port=port, log_level="info")
    server = uvicorn.Server(server_config)
    await server.serve()


async def main():
    done, pending = await asyncio.wait(
        [
            create_webserver(8000),
            create_webserver(8001),
        ],
        return_when=asyncio.FIRST_COMPLETED,
    )

    print("done")
    print(done)
    print("pending")
    print(pending)
    for pending_task in pending:
        pending_task.cancel("Another service died, server is shutting down")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(e)
        sys.exit(0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment