Created
August 14, 2018 11:16
-
-
Save RyanKung/61bde23819f5d2d19271134f6572f7e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import time | |
import simplejson as json | |
import websockets | |
import asyncio | |
from aioreactive.core import subscribe | |
from aioreactive.core import AsyncObservable, AsyncIteratorObserver | |
HUOBI_PRO: str = "wss://api.huobi.pro/ws" | |
def decode(a: bytes) -> dict: | |
return json.loads(gzip.decompress(a)) | |
def subscribe_data(symbol: str, period: str): | |
return json.dumps( | |
dict( | |
sub="market.{symbol}.kline.{period}".format(symbol=symbol, period=period), | |
id=str(time.time()) | |
) | |
) | |
async def pong(msg: dict, ws): | |
if 'ping' in msg.keys(): | |
await ws.send( | |
json.dumps(dict(pong=msg['ping'])) | |
) | |
return True | |
return False | |
async def handler(symbol, period): | |
async with websockets.connect(HUOBI_PRO) as ws: | |
await ws.send( | |
subscribe_data(symbol, period) | |
) | |
while 1: | |
raw = await asyncio.wait_for(ws.recv(), timeout=20) | |
data = decode(raw) | |
print(data) | |
if await pong(data, ws): | |
continue | |
else: | |
yield data | |
async def main(): | |
obv = AsyncIteratorObserver() | |
xs = AsyncObservable.from_async_iterable(handler("btcusdt", "1min")) | |
async with subscribe(xs, obv): | |
async for x in obv: | |
print(x) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(main()) | |
except KeyboardInterrupt as e: | |
loop.stop() | |
quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment