Last active
June 1, 2024 14:25
-
-
Save tomchristie/3293d5b118b5646ce79cc074976744b0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
async def main(): | |
async with ws_connect("ws://127.0.0.1:8765") as websockets: | |
await websockets.send("Hello, world.") | |
message = await websockets.recv() | |
print(message) | |
trio.run(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import contextlib | |
import os | |
import httpx | |
import wsproto | |
class ConnectionClosed(Exception): | |
pass | |
class WebsocketConnection: | |
def __init__(self, network_steam): | |
self._ws_connection_state = wsproto.Connection(wsproto.ConnectionType.CLIENT) | |
self._network_stream = network_steam | |
self._events = [] | |
async def send(self, text): | |
""" | |
Send a text frame over the websocket connection. | |
""" | |
event = wsproto.events.TextMessage(text) | |
data = self._ws_connection_state.send(event) | |
await self._network_stream.write(data) | |
async def recv(self): | |
""" | |
Receive the next text frame from the websocket connection. | |
""" | |
while not self._events: | |
data = await self._network_stream.read(max_bytes=4096) | |
self._ws_connection_state.receive_data(data) | |
self._events = list(self._ws_connection_state.events()) | |
event = self._events.pop(0) | |
if isinstance(event, wsproto.events.TextMessage): | |
return event.data | |
elif isinstance(event, wsproto.events.CloseConnection): | |
raise ConnectionClosed() | |
@contextlib.asynccontextmanager | |
async def ws_connect(url): | |
headers = { | |
"connection": "upgrade", | |
"upgrade": "websocket", | |
"sec-websocket-key": base64.b64encode(os.urandom(16)), | |
"sec-websocket-version": "13", | |
} | |
async with httpx.AsyncClient() as client: | |
async with client.stream("GET", url, headers=headers) as response: | |
network_steam = response.extensions["network_stream"] | |
yield WebsocketConnection(network_steam) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment