-
-
Save rcarmo/3f0772f2cbe0612b699dcbb839edabeb to your computer and use it in GitHub Desktop.
Example: asyncio and aiohttp, handling longpoll, eventsource and websocket requests with a queue and background workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiohttp import web | |
from threading import Thread | |
import asyncio | |
import time, uuid | |
loop = asyncio.get_event_loop() | |
def long_blocking_thing(sleep): | |
time.sleep(sleep) | |
return 42 | |
async def worker(q): | |
await q.put(b'coroutine: hello') | |
for i in range(1,11): | |
await asyncio.sleep(1) | |
await q.put(b'coroutine: ping %d' % i) | |
await q.put(b'The end!') | |
await q.put(None) | |
def start_background_work(q=None): | |
if q is None: | |
q = asyncio.Queue() | |
def _thread(): | |
# you can run non-asyncio code in a separate thread | |
# but use run_coroutine_threadsafe to fill in the queue | |
result = long_blocking_thing(5) | |
asyncio.run_coroutine_threadsafe(q.put(b'thread: hello %d' % result), loop) | |
Thread(target=_thread).start() | |
loop.create_task(worker(q)) | |
return q | |
# stupid session implementation | |
session_store = {} | |
async def longpoll(request): | |
response = web.Response() | |
session_key = request.cookies.get('aio-session') | |
if session_key is None: | |
session_key = str(uuid.uuid4()) | |
response.set_cookie('aio-session', session_key) | |
q = session_store.get(session_key) | |
if q is None: | |
# new session start some background work | |
q = start_background_work() | |
session_store[session_key] = q | |
msg = await q.get() | |
if msg is None: | |
# coroutines are done now | |
del session_store[session_key] | |
return web.Response(status=204) | |
response.body = msg + b'\n' # newline for curl :) | |
return response | |
async def sse_handler(request): | |
if request.headers.get('accept') != 'text/event-stream': | |
return web.Response(status=406) | |
stream = web.StreamResponse() | |
stream.headers['Content-Type'] = 'text/event-stream' | |
stream.headers['Cache-Control'] = 'no-cache' | |
stream.headers['Connection'] = 'keep-alive' | |
stream.enable_chunked_encoding() | |
await stream.prepare(request) | |
q = start_background_work() | |
while True: | |
msg = await q.get() | |
if msg is None: | |
break | |
stream.write(b"data: %s\r\n\r\n" % msg) | |
await stream.write_eof() | |
return stream | |
async def websocket_handler(request): | |
ws = web.WebSocketResponse() | |
await ws.prepare(request) | |
q = start_background_work() | |
while True: | |
msg = await q.get() | |
# or done, pending = await asyncio.wait([q.get(), ws.receive()], return_when=asyncio.FIRST_COMPLETED) | |
if msg is None: | |
break | |
else: | |
ws.send_bytes(msg) | |
await ws.close() | |
return ws | |
app = web.Application() | |
app.router.add_route('GET', '/', longpoll) | |
app.router.add_route('GET', '/ws', websocket_handler) | |
app.router.add_route('GET', '/sse', sse_handler) | |
async def init(loop): | |
handler = app.make_handler() | |
srv = await loop.create_server(handler, '0.0.0.0', 8080) | |
print('serving on', srv.sockets[0].getsockname()) | |
return srv | |
def main(): | |
loop.run_until_complete(init(loop)) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
async def main(): | |
session = aiohttp.ClientSession() | |
while True: | |
r = await session.get("http://localhost:8080/") | |
if r.status == 204: | |
r.close() | |
break | |
print(await r.text(), end='') | |
session.close() | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import sys | |
async def main(): | |
headers = {'accept': 'text/event-stream'} | |
r = await aiohttp.get("http://localhost:8080/sse", headers=headers) | |
while True: | |
msg = await r.content.readline() | |
if not msg: | |
break | |
print(msg) | |
await r.release() | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
async def main(): | |
ws = await aiohttp.ws_connect('http://localhost:8080/ws') | |
while True: | |
msg = await ws.receive() | |
if msg.tp == aiohttp.MsgType.binary: | |
print(msg.data) | |
elif msg.tp == aiohttp.MsgType.closed: | |
break | |
elif msg.tp == aiohttp.MsgType.error: | |
break | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment