Skip to content

Instantly share code, notes, and snippets.

@cosimo
Created February 27, 2019 15:08
Show Gist options
  • Save cosimo/d0a8776b1437e61ca5ea9574213caa56 to your computer and use it in GitHub Desktop.
Save cosimo/d0a8776b1437e61ca5ea9574213caa56 to your computer and use it in GitHub Desktop.
Example of Python server with *both* HTTP and Websockets capabilities
#!/usr/bin/python3.7
import aiohttp
from aiohttp import web, WSCloseCode
import asyncio
async def http_handler(request):
return web.Response(text='Hello, world')
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str('some websocket message payload')
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' % ws.exception())
return ws
def create_runner():
app = web.Application()
app.add_routes([
web.get('/', http_handler),
web.get('/ws', websocket_handler),
])
return web.AppRunner(app)
async def start_server(host="127.0.0.1", port=1337):
runner = create_runner()
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server())
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment