Skip to content

Instantly share code, notes, and snippets.

@MtkN1
Created August 4, 2022 11:30
Show Gist options
  • Save MtkN1/d9f86c9d0e4d19edeacba047ab706ac0 to your computer and use it in GitHub Desktop.
Save MtkN1/d9f86c9d0e4d19edeacba047ab706ac0 to your computer and use it in GitHub Desktop.
import asyncio
import os
import sys
import aiohttp
from loguru import logger
logger.remove()
logger.add(
sys.stdout,
filter=lambda x: x["extra"].get("event") != "message",
)
logger.add(
f'{os.getenv("HOME")}/.log/wstest/event.log',
filter=lambda x: x["extra"].get("event") != "message",
)
logger.add(
f'{os.getenv("HOME")}/.log/wstest/message.log',
filter=lambda x: x["extra"].get("event") == "message",
rotation="500 MB",
)
messagelogger = logger.bind(event="message")
async def send_str(data, ws: aiohttp.ClientWebSocketResponse):
try:
return await ws.send_str(data)
except Exception as e:
logger.exception(e)
return e
async def subscribe(ws: aiohttp.ClientWebSocketResponse):
data = r'{"op":"subscribe","channel":"ticker","market":"BTC-PERP"}'
await send_str(data, ws)
async def ping(ws: aiohttp.ClientWebSocketResponse):
data = r'{"op":"ping"}'
ret = await send_str(data, ws)
if ret is None:
await asyncio.sleep(15.0)
if not ws.closed:
asyncio.create_task(ping(ws))
async def main():
async with aiohttp.ClientSession() as session:
while not session.closed:
url = "wss://ftx.com/ws/"
heartbeat = 10.0
logger.debug("connecting")
try:
ws = await session.ws_connect(url, heartbeat=heartbeat)
except Exception as e:
logger.exception(e)
await asyncio.sleep(60.0)
else:
try:
logger.debug("open")
asyncio.create_task(ping(ws))
asyncio.create_task(subscribe(ws))
async for msg in ws:
messagelogger.debug(msg.data)
finally:
logger.debug("closing")
await ws.close()
finally:
logger.debug("closed")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment