-
-
Save normanlmfung/9aa46471b88be2cef6c8bcdadd0deb18 to your computer and use it in GitHub Desktop.
ccxt_ws_ob_provider.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This is a simply python util that fetches order book using CCXT Pro | |
https://docs.ccxt.com/#/ccxt.pro.manual?id=instantiation | |
Usage: | |
python ccxt_ws_ob_provider.py --ticker BTC/USDT:USDT --ts_delta_observation_ms_threshold 150 --ts_delta_consecutive_ms_threshold 150 | |
Keep track of latency issues: | |
a) ts_delta_observation_ms (Default 150 ms): Keep track of server clock vs timestamp from exchange | |
b) ts_delta_consecutive_ms (Default 150 ms): Keep track of gap between consecutive updates | |
To point to a different exchange, modify 'instantiate_exhange' and fix ccxt import. | |
To fetch spot market instead of swap? Change 'exchange_param'. | |
https://medium.com/@norman-lm-fung/fetching-orderbooks-via-ccxt-pro-websockets-library-300b21f02052 | |
''' | |
import argparse | |
from typing import Dict, Mapping, Union | |
import json | |
from datetime import datetime | |
import time | |
import operator | |
import logging | |
from tabulate import tabulate | |
import pandas as pd | |
from redis import Redis | |
import ccxt.pro as ccxtpro | |
from ccxt.base.exchange import Exchange | |
from asyncio import run | |
# Exchange Connectivity | |
exchange_params : Dict = { | |
'newUpdates': False, | |
'options' : { | |
'defaultType' : 'swap' # spot, swap | |
} | |
} | |
# General parameters. | |
param : Dict = { | |
# Provider ID is part of mds publish topic. | |
'provider_id' : 1, | |
# Publish to message bus | |
'mds' : { | |
'mds_topic' : 'ccxt_ws_ob_$PROVIDER_ID$', | |
'redis' : { | |
'host' : 'localhost', | |
'port' : 6379, | |
'db' : 0, | |
'ttl_ms' : 1000*60 | |
} | |
}, | |
# Keep track of latency issues | |
# a) ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange | |
# b) ts_delta_consecutive_ms: Keep track of gap between consecutive updates | |
'ts_delta_observation_ms_threshold' : 150, | |
'ts_delta_consecutive_ms_threshold' : 150 | |
} | |
ticker : str = 'BTC/USDT:USDT' | |
exchange_name : str = 'okx_linear' | |
param['job_name'] = f'ccxt_ws_ob_provider_{exchange_name}_{ticker.replace(":","_").replace("/","-")}' | |
exchange : Exchange = None | |
logging.Formatter.converter = time.gmtime | |
logger = logging.getLogger() | |
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR | |
logger.setLevel(log_level) | |
format_str = '%(asctime)s %(message)s' | |
formatter = logging.Formatter(format_str) | |
sh = logging.StreamHandler() | |
sh.setLevel(log_level) | |
sh.setFormatter(formatter) | |
logger.addHandler(sh) | |
fh = logging.FileHandler(f"{param['job_name']}.log") | |
fh.setLevel(log_level) | |
fh.setFormatter(formatter) | |
logger.addHandler(fh) | |
def log(message : Union[str, Dict]): | |
if not isinstance(message, Dict): | |
logger.info(message) | |
else: | |
logger.info(json.dumps(message, indent=4)) | |
async def instantiate_exhange( | |
exchange_name : str, | |
old_exchange : Exchange | |
): | |
if old_exchange: | |
await old_exchange.close() | |
exchange = ccxtpro.okx(exchange_params) | |
exchange.name = exchange_name | |
return exchange | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--ticker", help="Ticker for which to fetch orderbook",default=ticker) | |
parser.add_argument("--ts_delta_observation_ms_threshold", help="max threshold in ms: server clock vs update timestamp",default=param['ts_delta_observation_ms_threshold']) | |
parser.add_argument("--ts_delta_consecutive_ms_threshold", help="max threshold in ms: gap between consecutive updates",default=param['ts_delta_consecutive_ms_threshold']) | |
args = parser.parse_args() | |
if args.ticker: | |
ticker = args.ticker | |
if args.ts_delta_observation_ms_threshold: | |
param['ts_delta_observation_ms_threshold'] = int(args.ts_delta_observation_ms_threshold) | |
if args.ts_delta_consecutive_ms_threshold: | |
param['ts_delta_consecutive_ms_threshold'] = int(args.ts_delta_consecutive_ms_threshold) | |
class OrderBook: | |
def __init__( | |
self, | |
ticker : str, | |
exchange_name : str | |
) -> None: | |
self.ticker : str = ticker | |
self.exchange_name : str = exchange_name | |
self.bids : Dict = {} | |
self.asks : Dict = {} | |
self.last_timestamp_ms : Union[int,None] = None | |
self.timestamp : Union[int,None] = None # order book update timestamp in sec | |
self.timestamp_ms : Union[int,None] = None # order book update timestamp in ms | |
self.ts_delta_consecutive_ms : int = 0 | |
self.ts_delta_observation_ms : int = 0 | |
self.is_valid : bool = True | |
def update_book( | |
self, | |
update : Mapping, | |
param : dict | |
) -> None: | |
update_ts_ms = update['timestamp'] | |
if len(str(update_ts_ms))==10: | |
update_ts_ms = update_ts_ms*1000 | |
self.last_timestamp_ms = self.timestamp_ms | |
self.timestamp_ms = int(update_ts_ms) | |
self.timestamp = int(self.timestamp_ms/1000) | |
''' | |
Keep track of latency issues | |
a) ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange | |
b) ts_delta_consecutive_ms: Keep track of gap between consecutive updates | |
''' | |
self.ts_delta_observation_ms = int(datetime.now().timestamp()*1000) - self.timestamp_ms | |
self.ts_delta_consecutive_ms = self.timestamp_ms - self.last_timestamp_ms if self.last_timestamp_ms else 0 | |
self.is_valid = True | |
if self.ts_delta_observation_ms>param['ts_delta_observation_ms_threshold']: | |
self.is_valid = False | |
if self.ts_delta_consecutive_ms>param['ts_delta_consecutive_ms_threshold']: | |
self.is_valid = False | |
self.bids.update((float(bid[0]), float(bid[1])) for bid in update.get('bids', [])) | |
self.asks.update((float(ask[0]), float(ask[1])) for ask in update.get('asks', [])) | |
self.bids = { key:val for key,val in self.bids.items() if val!=0} | |
self.asks = { key:val for key,val in self.asks.items() if val!=0} | |
if self.bids and self.asks: | |
best_ask = float(min(self.asks.keys())) | |
best_bid = float(max(self.bids.keys())) | |
if best_ask<best_bid: | |
raise ValueError(f"{self.exchange_name} {self.ticker} best bid >= best ask!?! How?") | |
self.bids = dict(sorted(self.bids.items(), reverse=True)) | |
self.asks = dict(sorted(self.asks.items())) | |
def to_dict(self): | |
bids = ([float(price), float(amount)] for price, amount in self.bids.items() if float(amount)) | |
asks = ([float(price), float(amount)] for price, amount in self.asks.items() if float(amount)) | |
data = { | |
"denormalized_ticker" : self.ticker, | |
"exchange_name" : self.exchange_name, | |
"bids" : sorted(bids, key=operator.itemgetter(0), reverse=True), | |
"asks" : sorted(asks, key=operator.itemgetter(0)), | |
"timestamp" : self.timestamp, # in sec | |
"timestamp_ms" : self.timestamp_ms, # in ms (timestamp in update from exchange) | |
'ts_delta_observation_ms' : self.ts_delta_observation_ms, | |
'ts_delta_consecutive_ms' : self.ts_delta_consecutive_ms, | |
"is_valid" : self.is_valid | |
} | |
data['best_ask'] = min(data['asks']) | |
data['best_bid'] = min(data['bids']) | |
return data | |
async def main(): | |
redis_client = Redis( | |
host = param['mds']['redis']['host'], | |
port = param['mds']['redis']['port'], | |
db = param['mds']['redis']['db'], | |
ssl = False | |
) | |
mds_topic = param['mds']['mds_topic'].replace('$PROVIDER_ID$', str(param['provider_id'])) | |
exchange = await instantiate_exhange(exchange_name=exchange_name, old_exchange=None) | |
ob = OrderBook(ticker=ticker, exchange_name=exchange.name) | |
local_server_timestamp_ms = datetime.now().timestamp()*1000 | |
exchange_timestamp_ms = await exchange.fetch_time() | |
timestamp_gap_ms = local_server_timestamp_ms - exchange_timestamp_ms | |
log(f"local_server_timestamp_ms: {local_server_timestamp_ms} vs exchange_timestamp_ms: {exchange_timestamp_ms}. timestamp_gap_ms: {timestamp_gap_ms}") | |
while True: | |
try: | |
update = await exchange.watch_order_book(ticker) | |
ob.update_book(update=update, param=param) | |
ob_dict = ob.to_dict() | |
redis_client.set(name=mds_topic, value=json.dumps(ob_dict), ex=int(param['mds']['redis']['ttl_ms']/1000)) | |
ob_dict.pop('bids') | |
ob_dict.pop('asks') | |
pd_ob = pd.DataFrame(ob_dict) | |
log(f"{tabulate(pd_ob, headers=pd_ob.columns)}") | |
except ValueError as update_error: | |
log(f"Update error! {update_error}") | |
exchange = await instantiate_exhange(exchange_name=exchange_name, old_exchange=exchange) | |
ob = OrderBook(ticker=ticker, exchange_name=exchange.name) | |
run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment