Skip to content

Instantly share code, notes, and snippets.

@RyanKung
Created August 14, 2018 11:16
Show Gist options
  • Save RyanKung/61bde23819f5d2d19271134f6572f7e2 to your computer and use it in GitHub Desktop.
Save RyanKung/61bde23819f5d2d19271134f6572f7e2 to your computer and use it in GitHub Desktop.
import gzip
import time
import simplejson as json
import websockets
import asyncio
from aioreactive.core import subscribe
from aioreactive.core import AsyncObservable, AsyncIteratorObserver
HUOBI_PRO: str = "wss://api.huobi.pro/ws"
def decode(a: bytes) -> dict:
return json.loads(gzip.decompress(a))
def subscribe_data(symbol: str, period: str):
return json.dumps(
dict(
sub="market.{symbol}.kline.{period}".format(symbol=symbol, period=period),
id=str(time.time())
)
)
async def pong(msg: dict, ws):
if 'ping' in msg.keys():
await ws.send(
json.dumps(dict(pong=msg['ping']))
)
return True
return False
async def handler(symbol, period):
async with websockets.connect(HUOBI_PRO) as ws:
await ws.send(
subscribe_data(symbol, period)
)
while 1:
raw = await asyncio.wait_for(ws.recv(), timeout=20)
data = decode(raw)
print(data)
if await pong(data, ws):
continue
else:
yield data
async def main():
obv = AsyncIteratorObserver()
xs = AsyncObservable.from_async_iterable(handler("btcusdt", "1min"))
async with subscribe(xs, obv):
async for x in obv:
print(x)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt as e:
loop.stop()
quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment