Skip to content

Instantly share code, notes, and snippets.

@tomchristie
Last active June 1, 2024 14:25
Show Gist options
  • Save tomchristie/3293d5b118b5646ce79cc074976744b0 to your computer and use it in GitHub Desktop.
Save tomchristie/3293d5b118b5646ce79cc074976744b0 to your computer and use it in GitHub Desktop.
import trio
async def main():
async with ws_connect("ws://127.0.0.1:8765") as websockets:
await websockets.send("Hello, world.")
message = await websockets.recv()
print(message)
trio.run(main)
import base64
import contextlib
import os
import httpx
import wsproto
class ConnectionClosed(Exception):
pass
class WebsocketConnection:
def __init__(self, network_steam):
self._ws_connection_state = wsproto.Connection(wsproto.ConnectionType.CLIENT)
self._network_stream = network_steam
self._events = []
async def send(self, text):
"""
Send a text frame over the websocket connection.
"""
event = wsproto.events.TextMessage(text)
data = self._ws_connection_state.send(event)
await self._network_stream.write(data)
async def recv(self):
"""
Receive the next text frame from the websocket connection.
"""
while not self._events:
data = await self._network_stream.read(max_bytes=4096)
self._ws_connection_state.receive_data(data)
self._events = list(self._ws_connection_state.events())
event = self._events.pop(0)
if isinstance(event, wsproto.events.TextMessage):
return event.data
elif isinstance(event, wsproto.events.CloseConnection):
raise ConnectionClosed()
@contextlib.asynccontextmanager
async def ws_connect(url):
headers = {
"connection": "upgrade",
"upgrade": "websocket",
"sec-websocket-key": base64.b64encode(os.urandom(16)),
"sec-websocket-version": "13",
}
async with httpx.AsyncClient() as client:
async with client.stream("GET", url, headers=headers) as response:
network_steam = response.extensions["network_stream"]
yield WebsocketConnection(network_steam)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment