Skip to content

Instantly share code, notes, and snippets.

@raddy
Last active April 13, 2018 16:49
Show Gist options
  • Select an option

  • Save raddy/76dc10e6a985e99ae8f41314b595995d to your computer and use it in GitHub Desktop.

Select an option

Save raddy/76dc10e6a985e99ae8f41314b595995d to your computer and use it in GitHub Desktop.
gdax websocket simple
""" Simple websocket client wrapper """
import asyncio
import ujson
import websockets
from logging import getLogger
l = getLogger('websocket_client')
class WebsocketClient:
"""
Simple websocket connection with heartbeats on an event loop
"""
def __init__(self, loop, addr, **kwargs):
"""
Create new WebsocketClient
:param loop: Pointer to eventloop (Should be uvloop)
:type loop: Eventloop
:param addr: Address of websocket
:type addr: str
"""
self.addr = addr
self.loop = loop
self.ws = None
self.heartbeat_interval = kwargs.pop("heartbeat_interval", 5)
self.reconnect_interval = kwargs.pop("reconnect_interval", 5)
self.connect_params = kwargs.pop("connect_params", {})
self.on_connect = kwargs.pop("on_connect", None)
async def connect(self):
"""
Asynchronously connect to remote websocket.
"""
l.info('Websocket: connecting to %s, params %s' %
(self.addr, str(self.connect_params)))
ws = await websockets.connect(self.addr, **self.connect_params)
l.info('Websocket: connected to %s' % self.addr)
self.ws = ws
self.loop.create_task(self.timeout())
await self.on_connected()
async def send(self, msg):
"""
Send a message to remote websocket
:param msg: Message to send
:type msg: str
"""
await self.ws.send(msg)
async def svc(self):
"""
Inner loop. Tries to connect a few times.
"""
while True:
if self.ws is None:
await asyncio.sleep(1)
await self.connect()
continue
try:
msg = await asyncio.wait_for(self.ws.recv(), timeout=self.reconnect_interval)
await self.on_msg(msg)
except asyncio.TimeoutError:
self.ws = None
except:
logger.error("CRITICAL -- Unkown, unhandled error")
self.ws = None
async def timeout(self):
"""
Clunky...
"""
await asyncio.sleep(self.heartbeat_interval)
self.loop.create_task(self.timeout())
# TODO check if we need to do reconnect
await self.heartbeat()
async def on_connected(self):
"""
Called when conection to remote socket is established. By default does nothing
"""
if self.on_connect:
await self.ws.send(self.on_connect)
async def on_msg(self, msg):
"""
Called on every received message. Expected to be overloaded.
"""
l.error(msg)
async def heartbeat(self):
"""
Sends heartbeat message back to server. Expected to be overloaded.
"""
pass
def start(self):
"""
Start up a connection
"""
asyncio.ensure_future(self.svc())
async def hi():
client.start()
return
symbols = ["BTC-USD", "ETH-USD", "LTC-USD"]
gdax_subscribe = ujson.dumps({'type': 'subscribe', 'product_ids': symbols})
loop = asyncio.get_event_loop()
client = WebsocketClient(loop, 'wss://ws-feed.gdax.com', on_connect = gdax_subscribe)
future = asyncio.ensure_future(hi())
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment