Last active
April 13, 2018 16:49
-
-
Save raddy/76dc10e6a985e99ae8f41314b595995d to your computer and use it in GitHub Desktop.
gdax websocket simple
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ Simple websocket client wrapper """ | |
| import asyncio | |
| import ujson | |
| import websockets | |
| from logging import getLogger | |
| l = getLogger('websocket_client') | |
| class WebsocketClient: | |
| """ | |
| Simple websocket connection with heartbeats on an event loop | |
| """ | |
| def __init__(self, loop, addr, **kwargs): | |
| """ | |
| Create new WebsocketClient | |
| :param loop: Pointer to eventloop (Should be uvloop) | |
| :type loop: Eventloop | |
| :param addr: Address of websocket | |
| :type addr: str | |
| """ | |
| self.addr = addr | |
| self.loop = loop | |
| self.ws = None | |
| self.heartbeat_interval = kwargs.pop("heartbeat_interval", 5) | |
| self.reconnect_interval = kwargs.pop("reconnect_interval", 5) | |
| self.connect_params = kwargs.pop("connect_params", {}) | |
| self.on_connect = kwargs.pop("on_connect", None) | |
| async def connect(self): | |
| """ | |
| Asynchronously connect to remote websocket. | |
| """ | |
| l.info('Websocket: connecting to %s, params %s' % | |
| (self.addr, str(self.connect_params))) | |
| ws = await websockets.connect(self.addr, **self.connect_params) | |
| l.info('Websocket: connected to %s' % self.addr) | |
| self.ws = ws | |
| self.loop.create_task(self.timeout()) | |
| await self.on_connected() | |
| async def send(self, msg): | |
| """ | |
| Send a message to remote websocket | |
| :param msg: Message to send | |
| :type msg: str | |
| """ | |
| await self.ws.send(msg) | |
| async def svc(self): | |
| """ | |
| Inner loop. Tries to connect a few times. | |
| """ | |
| while True: | |
| if self.ws is None: | |
| await asyncio.sleep(1) | |
| await self.connect() | |
| continue | |
| try: | |
| msg = await asyncio.wait_for(self.ws.recv(), timeout=self.reconnect_interval) | |
| await self.on_msg(msg) | |
| except asyncio.TimeoutError: | |
| self.ws = None | |
| except: | |
| logger.error("CRITICAL -- Unkown, unhandled error") | |
| self.ws = None | |
| async def timeout(self): | |
| """ | |
| Clunky... | |
| """ | |
| await asyncio.sleep(self.heartbeat_interval) | |
| self.loop.create_task(self.timeout()) | |
| # TODO check if we need to do reconnect | |
| await self.heartbeat() | |
| async def on_connected(self): | |
| """ | |
| Called when conection to remote socket is established. By default does nothing | |
| """ | |
| if self.on_connect: | |
| await self.ws.send(self.on_connect) | |
| async def on_msg(self, msg): | |
| """ | |
| Called on every received message. Expected to be overloaded. | |
| """ | |
| l.error(msg) | |
| async def heartbeat(self): | |
| """ | |
| Sends heartbeat message back to server. Expected to be overloaded. | |
| """ | |
| pass | |
| def start(self): | |
| """ | |
| Start up a connection | |
| """ | |
| asyncio.ensure_future(self.svc()) | |
| async def hi(): | |
| client.start() | |
| return | |
| symbols = ["BTC-USD", "ETH-USD", "LTC-USD"] | |
| gdax_subscribe = ujson.dumps({'type': 'subscribe', 'product_ids': symbols}) | |
| loop = asyncio.get_event_loop() | |
| client = WebsocketClient(loop, 'wss://ws-feed.gdax.com', on_connect = gdax_subscribe) | |
| future = asyncio.ensure_future(hi()) | |
| loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment