Skip to content

Instantly share code, notes, and snippets.

@mnixry
Last active August 17, 2023 04:57
Show Gist options
  • Save mnixry/79bf53c34cedcafc27c3663c75930f6f to your computer and use it in GitHub Desktop.
Save mnixry/79bf53c34cedcafc27c3663c75930f6f to your computer and use it in GitHub Desktop.
WebSocketReflectorX, but Python.
import asyncio
import random
import sys
from typing import Optional
from websockets.legacy.client import WebSocketClientProtocol
from websockets.legacy.client import connect as websockets_connect
BUFFER_SIZE = 1024
class WebSocket:
connection: Optional[WebSocketClientProtocol] = None
stopped = False
def __init__(self, url: str) -> None:
self.ws = websockets_connect(url)
self.reader = asyncio.StreamReader()
self.recv_task = asyncio.create_task(self._recv_worker())
self.start_signal = asyncio.Event()
async def _recv_worker(self) -> None:
try:
async with self.ws as connection:
self.connection = connection
self.start_signal.set()
while not self.stopped:
data = await connection.recv()
if isinstance(data, str):
data = data.encode()
self.reader.feed_data(data)
except Exception as e:
self.reader.set_exception(e)
self.reader.feed_eof()
self.connection = None
async def recv(self, num: int) -> bytes:
return await self.reader.read(num)
async def send(self, data: bytes) -> int:
if self.connection is None:
await self.start_signal.wait()
assert self.connection is not None, "WebSocket connection is None"
await self.connection.send(data)
return len(data)
@property
def closed(self) -> bool:
return self.stopped
async def close(self) -> None:
self.stopped = True
if self.connection is not None:
await self.connection.close()
await self.recv_task
async def server(
url: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
ws = WebSocket(url)
async def r():
# forward data from client to server
while not ws.closed:
data = await reader.read(BUFFER_SIZE)
if not data:
break
await ws.send(data)
async def w():
# forward data from server to client
while not ws.closed:
data = await ws.recv(BUFFER_SIZE)
if not data:
break
writer.write(data)
await writer.drain()
await asyncio.wait([r(), w()], return_when=asyncio.FIRST_COMPLETED)
await ws.close()
async def main() -> None:
s = await asyncio.start_server(
lambda *args: server(url, *args),
host="0.0.0.0",
port=port,
)
async with s:
await s.serve_forever()
if __name__ == "__main__":
try:
app_name, url = sys.argv
except ValueError:
print("Usage: python proxy.py <url>")
exit(1)
port = random.randint(1000, 65535)
print(f"Starting proxy server at 0.0.0.0:{port}")
asyncio.run(
main(),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment