Created
March 25, 2022 12:50
-
-
Save MtkN1/1efaf7928e8f823ae01a5e8744af309b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
import time | |
import aiohttp | |
""" | |
WebSocket テストクライアント | |
テストサーバーに接続して 0 から始まるサーバーのカウンターとローカルのカウンターの一致を照合する。 | |
カウンターが 10 の時点で CPU バウンドのブロッキング処理を行う。(1億回ループの加算、自環境10秒程度) | |
カウンターの受信漏れがあれば一致せず AssertionError が発生すると想定。 | |
結果: | |
受信漏れは発生しなかった。 | |
ブロッキング処理を長くしたりサーバー側の送信速度などを早くしても同様。 | |
想定: | |
ブロッキング処理後にカウンターの print 標準出力が一気(100 ms より早く)に表示される。 | |
そしてサーバー側がタイムアウトしないことからも Python プログラムより上位レイヤーで常に通信が行われて WebSocket メッセージを受信している(?) | |
その為ブロッキング処理後待ち時間なしで print されているのだと思われる。 | |
""" | |
def heavy_task(): | |
print("heavy_task") | |
# time.sleep(10.0) | |
n = 0 | |
for i in range(100000000): | |
n += i | |
async def main(): | |
async with aiohttp.ClientSession() as session: | |
async with session.ws_connect("ws://localhost:8080/ws") as ws: | |
counter = itertools.count() | |
async for msg in ws: | |
if msg.type == aiohttp.WSMsgType.TEXT: | |
server_count = msg.json() | |
local_count = next(counter) | |
print(f"{server_count}:{local_count} {(server_count == local_count)=:}") | |
assert server_count == local_count, f"different count" | |
if server_count == 10: | |
heavy_task() | |
elif msg.type == aiohttp.WSMsgType.ERROR: | |
break | |
if server_count >= 99: | |
break | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
from aiohttp import web | |
""" | |
WebSocket テストサーバー | |
クライアントが接続されたら 100 ms ごとに 0 から始まるカウンターを送信する。 | |
送信に 1 ms 以上要した場合は TimeoutError を発生させる。(その場合クライアントはローカルのカウンターと一致しないと想定) | |
""" | |
async def send_counter(ws: web.WebSocketResponse): | |
for count in itertools.count(): | |
try: | |
await asyncio.wait_for(ws.send_json(count), timeout=0.001) | |
except asyncio.TimeoutError: | |
print(f"TimeoutError: {count}") | |
await asyncio.sleep(0.1) | |
async def websocket_handler(request): | |
ws = web.WebSocketResponse() | |
await ws.prepare(request) | |
task = asyncio.create_task(send_counter(ws)) | |
async for msg in ws: ... | |
task.cancel() | |
print("websocket connection closed") | |
return ws | |
app = web.Application() | |
app.add_routes([web.get("/ws", websocket_handler)]) | |
web.run_app(app) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment