Skip to content

Instantly share code, notes, and snippets.

@normanlmfung
Last active November 4, 2024 23:10
Show Gist options
  • Save normanlmfung/9aa46471b88be2cef6c8bcdadd0deb18 to your computer and use it in GitHub Desktop.
Save normanlmfung/9aa46471b88be2cef6c8bcdadd0deb18 to your computer and use it in GitHub Desktop.
ccxt_ws_ob_provider.py
'''
This is a simply python util that fetches order book using CCXT Pro
https://docs.ccxt.com/#/ccxt.pro.manual?id=instantiation
Usage:
python ccxt_ws_ob_provider.py --ticker BTC/USDT:USDT --ts_delta_observation_ms_threshold 150 --ts_delta_consecutive_ms_threshold 150
Keep track of latency issues:
a) ts_delta_observation_ms (Default 150 ms): Keep track of server clock vs timestamp from exchange
b) ts_delta_consecutive_ms (Default 150 ms): Keep track of gap between consecutive updates
To point to a different exchange, modify 'instantiate_exhange' and fix ccxt import.
To fetch spot market instead of swap? Change 'exchange_param'.
https://medium.com/@norman-lm-fung/fetching-orderbooks-via-ccxt-pro-websockets-library-300b21f02052
'''
import argparse
from typing import Dict, Mapping, Union
import json
from datetime import datetime
import time
import operator
import logging
from tabulate import tabulate
import pandas as pd
from redis import Redis
import ccxt.pro as ccxtpro
from ccxt.base.exchange import Exchange
from asyncio import run
# Exchange Connectivity
exchange_params : Dict = {
'newUpdates': False,
'options' : {
'defaultType' : 'swap' # spot, swap
}
}
# General parameters.
param : Dict = {
# Provider ID is part of mds publish topic.
'provider_id' : 1,
# Publish to message bus
'mds' : {
'mds_topic' : 'ccxt_ws_ob_$PROVIDER_ID$',
'redis' : {
'host' : 'localhost',
'port' : 6379,
'db' : 0,
'ttl_ms' : 1000*60
}
},
# Keep track of latency issues
# a) ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange
# b) ts_delta_consecutive_ms: Keep track of gap between consecutive updates
'ts_delta_observation_ms_threshold' : 150,
'ts_delta_consecutive_ms_threshold' : 150
}
ticker : str = 'BTC/USDT:USDT'
exchange_name : str = 'okx_linear'
param['job_name'] = f'ccxt_ws_ob_provider_{exchange_name}_{ticker.replace(":","_").replace("/","-")}'
exchange : Exchange = None
logging.Formatter.converter = time.gmtime
logger = logging.getLogger()
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR
logger.setLevel(log_level)
format_str = '%(asctime)s %(message)s'
formatter = logging.Formatter(format_str)
sh = logging.StreamHandler()
sh.setLevel(log_level)
sh.setFormatter(formatter)
logger.addHandler(sh)
fh = logging.FileHandler(f"{param['job_name']}.log")
fh.setLevel(log_level)
fh.setFormatter(formatter)
logger.addHandler(fh)
def log(message : Union[str, Dict]):
if not isinstance(message, Dict):
logger.info(message)
else:
logger.info(json.dumps(message, indent=4))
async def instantiate_exhange(
exchange_name : str,
old_exchange : Exchange
):
if old_exchange:
await old_exchange.close()
exchange = ccxtpro.okx(exchange_params)
exchange.name = exchange_name
return exchange
parser = argparse.ArgumentParser()
parser.add_argument("--ticker", help="Ticker for which to fetch orderbook",default=ticker)
parser.add_argument("--ts_delta_observation_ms_threshold", help="max threshold in ms: server clock vs update timestamp",default=param['ts_delta_observation_ms_threshold'])
parser.add_argument("--ts_delta_consecutive_ms_threshold", help="max threshold in ms: gap between consecutive updates",default=param['ts_delta_consecutive_ms_threshold'])
args = parser.parse_args()
if args.ticker:
ticker = args.ticker
if args.ts_delta_observation_ms_threshold:
param['ts_delta_observation_ms_threshold'] = int(args.ts_delta_observation_ms_threshold)
if args.ts_delta_consecutive_ms_threshold:
param['ts_delta_consecutive_ms_threshold'] = int(args.ts_delta_consecutive_ms_threshold)
class OrderBook:
def __init__(
self,
ticker : str,
exchange_name : str
) -> None:
self.ticker : str = ticker
self.exchange_name : str = exchange_name
self.bids : Dict = {}
self.asks : Dict = {}
self.last_timestamp_ms : Union[int,None] = None
self.timestamp : Union[int,None] = None # order book update timestamp in sec
self.timestamp_ms : Union[int,None] = None # order book update timestamp in ms
self.ts_delta_consecutive_ms : int = 0
self.ts_delta_observation_ms : int = 0
self.is_valid : bool = True
def update_book(
self,
update : Mapping,
param : dict
) -> None:
update_ts_ms = update['timestamp']
if len(str(update_ts_ms))==10:
update_ts_ms = update_ts_ms*1000
self.last_timestamp_ms = self.timestamp_ms
self.timestamp_ms = int(update_ts_ms)
self.timestamp = int(self.timestamp_ms/1000)
'''
Keep track of latency issues
a) ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange
b) ts_delta_consecutive_ms: Keep track of gap between consecutive updates
'''
self.ts_delta_observation_ms = int(datetime.now().timestamp()*1000) - self.timestamp_ms
self.ts_delta_consecutive_ms = self.timestamp_ms - self.last_timestamp_ms if self.last_timestamp_ms else 0
self.is_valid = True
if self.ts_delta_observation_ms>param['ts_delta_observation_ms_threshold']:
self.is_valid = False
if self.ts_delta_consecutive_ms>param['ts_delta_consecutive_ms_threshold']:
self.is_valid = False
self.bids.update((float(bid[0]), float(bid[1])) for bid in update.get('bids', []))
self.asks.update((float(ask[0]), float(ask[1])) for ask in update.get('asks', []))
self.bids = { key:val for key,val in self.bids.items() if val!=0}
self.asks = { key:val for key,val in self.asks.items() if val!=0}
if self.bids and self.asks:
best_ask = float(min(self.asks.keys()))
best_bid = float(max(self.bids.keys()))
if best_ask<best_bid:
raise ValueError(f"{self.exchange_name} {self.ticker} best bid >= best ask!?! How?")
self.bids = dict(sorted(self.bids.items(), reverse=True))
self.asks = dict(sorted(self.asks.items()))
def to_dict(self):
bids = ([float(price), float(amount)] for price, amount in self.bids.items() if float(amount))
asks = ([float(price), float(amount)] for price, amount in self.asks.items() if float(amount))
data = {
"denormalized_ticker" : self.ticker,
"exchange_name" : self.exchange_name,
"bids" : sorted(bids, key=operator.itemgetter(0), reverse=True),
"asks" : sorted(asks, key=operator.itemgetter(0)),
"timestamp" : self.timestamp, # in sec
"timestamp_ms" : self.timestamp_ms, # in ms (timestamp in update from exchange)
'ts_delta_observation_ms' : self.ts_delta_observation_ms,
'ts_delta_consecutive_ms' : self.ts_delta_consecutive_ms,
"is_valid" : self.is_valid
}
data['best_ask'] = min(data['asks'])
data['best_bid'] = min(data['bids'])
return data
async def main():
redis_client = Redis(
host = param['mds']['redis']['host'],
port = param['mds']['redis']['port'],
db = param['mds']['redis']['db'],
ssl = False
)
mds_topic = param['mds']['mds_topic'].replace('$PROVIDER_ID$', str(param['provider_id']))
exchange = await instantiate_exhange(exchange_name=exchange_name, old_exchange=None)
ob = OrderBook(ticker=ticker, exchange_name=exchange.name)
local_server_timestamp_ms = datetime.now().timestamp()*1000
exchange_timestamp_ms = await exchange.fetch_time()
timestamp_gap_ms = local_server_timestamp_ms - exchange_timestamp_ms
log(f"local_server_timestamp_ms: {local_server_timestamp_ms} vs exchange_timestamp_ms: {exchange_timestamp_ms}. timestamp_gap_ms: {timestamp_gap_ms}")
while True:
try:
update = await exchange.watch_order_book(ticker)
ob.update_book(update=update, param=param)
ob_dict = ob.to_dict()
redis_client.set(name=mds_topic, value=json.dumps(ob_dict), ex=int(param['mds']['redis']['ttl_ms']/1000))
ob_dict.pop('bids')
ob_dict.pop('asks')
pd_ob = pd.DataFrame(ob_dict)
log(f"{tabulate(pd_ob, headers=pd_ob.columns)}")
except ValueError as update_error:
log(f"Update error! {update_error}")
exchange = await instantiate_exhange(exchange_name=exchange_name, old_exchange=exchange)
ob = OrderBook(ticker=ticker, exchange_name=exchange.name)
run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment