Skip to content

Instantly share code, notes, and snippets.

@normanlmfung
Last active October 29, 2024 04:12
Show Gist options
  • Save normanlmfung/aaaadeb4de284986a33b3b490ad34fbe to your computer and use it in GitHub Desktop.
Save normanlmfung/aaaadeb4de284986a33b3b490ad34fbe to your computer and use it in GitHub Desktop.
crypto_pairs_correlation.py
from datetime import datetime
from typing import Dict, List, Union, NoReturn
import pandas as pd
from ccxt.binance import binance
from ccxt.bitmex import bitmex
from ccxt.okx import okx
from ccxt.base.exchange import Exchange
from ccxt.base.exchange import Exchange as CcxtExchange
# https://www.analyticsvidhya.com/blog/2021/06/download-financial-dataset-using-yahoo-finance-in-python-a-complete-guide/
from yahoofinancials import YahooFinancials
# yfinance allows intervals '1m', '5m', '15m', '1h', '1d', '1wk', '1mo'. yahoofinancials not as flexible
import yfinance as yf
num_candles_limit : int = 100 # Depends on exchange but generally 100 ok!
param = {
'apiKey' : None,
'secret' : None,
'password' : None,
'subaccount' : None,
'rateLimit' : 100, # In ms
'options' : {
'defaultType': 'spot',
'leg_room_bps' : 5,
'trade_fee_bps' : 3
}
}
dt_start : datetime = datetime(2021,1,1)
dt_end : datetime = datetime(2024,4,25)
candle_size : str = '1d'
target_exchange = binance(param)
white_list_tickers = [ 'BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'ADA/USDT', 'MATIC/USDT', 'AVAX/USDT', 'LINK/USDT', 'DOT/USDT', 'ATOM/USDT' ]
class YahooExchange:
def fetch_candles(
self,
start_ts,
end_ts,
symbols,
candle_size
) -> Dict[str, Union[pd.DataFrame, None]]:
exchange_candles : Dict[str, Union[pd.DataFrame, None]] = {}
start_date = datetime.fromtimestamp(start_ts)
end_date = datetime.fromtimestamp(end_ts)
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')
local_tz = datetime.now().astimezone().tzinfo
for symbol in symbols:
# From yf, "DateTime" in UTC
pd_candles = yf.download(tickers=symbol, start=start_date_str, end=end_date_str, interval=candle_size)
pd_candles.reset_index(inplace=True)
pd_candles.rename(columns={'Datetime' : 'datetime', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Close' : 'close', 'Adj Close' : 'adj_close', 'Volume' : 'volume' }, inplace=True)
pd_candles['datetime'] = pd_candles['datetime'].dt.tz_convert(local_tz)
pd_candles['timestamp_ms'] = pd_candles.datetime.values.astype(np.int64) // 10**6
pd_candles = pd_candles.sort_values(by=['timestamp_ms'], ascending=[True])
exchange_candles[symbol] = pd_candles
return exchange_candles
def fetch_candles(
start_ts, # in sec
end_ts, # in sec
exchange,
normalized_symbols,
candle_size,
how_many_candles = None,
logger = None,
num_candles_limit : int = 100,
cache_dir : Union[str, None] = None,
list_ts_field : Union[str, None] = None,
validation_max_gaps : int = 10,
validation_max_end_date_intervals : int = 1
) -> Dict[str, Union[pd.DataFrame, None]]:
if type(exchange) is YahooExchange:
return exchange.fetch_candles(
start_ts=start_ts,
end_ts=end_ts,
symbols=normalized_symbols,
candle_size=candle_size
)
elif issubclass(exchange.__class__, CcxtExchange):
return _fetch_candles_ccxt(
start_ts=start_ts,
end_ts=end_ts,
exchange=exchange,
normalized_symbols=normalized_symbols,
candle_size=candle_size,
how_many_candles=how_many_candles,
logger=logger,
num_candles_limit=num_candles_limit,
cache_dir=cache_dir,
list_ts_field=list_ts_field
)
return { '' : None }
def _fetch_candles_ccxt(
start_ts : int,
end_ts : int,
exchange,
normalized_symbols : List[str],
candle_size : str,
how_many_candles : Union[int, None],
num_candles_limit : int = 100,
logger = None,
cache_dir : Union[str, None] = None,
list_ts_field : Union[str, None] = None
)-> Dict[str, Union[pd.DataFrame, None]]:
ticker = normalized_symbols[0]
pd_candles = _fetch_candles(
symbol = ticker,
exchange = exchange,
start_ts = start_ts,
end_ts = end_ts,
candle_size = candle_size,
)
return {
ticker : pd_candles
}
def _fetch_candles(
symbol : str,
exchange : CcxtExchange,
start_ts : int,
end_ts : int,
candle_size : str = '1d',
num_candles_limit : int = 100
):
def _fix_column_types(pd_candles : pd.DataFrame):
pd_candles['open'] = pd_candles['open'].astype(float)
pd_candles['high'] = pd_candles['high'].astype(float)
pd_candles['low'] = pd_candles['low'].astype(float)
pd_candles['close'] = pd_candles['close'].astype(float)
pd_candles['volume'] = pd_candles['volume'].astype(float)
pd_all_candles['datetime'] = pd_all_candles['timestamp_ms'].apply(lambda x : datetime.fromtimestamp(int(x/1000)))
pd_candles['datetime'] = pd.to_datetime(pd_candles['datetime'])
# This is to make it easy to do grouping with Excel pivot table
pd_candles['year'] = pd_candles['datetime'].dt.year
pd_candles['month'] = pd_candles['datetime'].dt.month
pd_candles['day'] = pd_candles['datetime'].dt.day
pd_candles['hour'] = pd_candles['datetime'].dt.hour
pd_candles['minute'] = pd_candles['datetime'].dt.minute
def _fetch_ohlcv(exchange, symbol, timeframe, since, limit, params) -> Union[List, NoReturn]:
one_timeframe = f"1{timeframe[-1]}"
candles = exchange.fetch_ohlcv(symbol=symbol, timeframe=one_timeframe, since=since, limit=limit, params=params)
if candles and len(candles)>0:
candles.sort(key=lambda x : x[0], reverse=False)
return candles
all_candles = []
params = {}
this_cutoff = start_ts
while this_cutoff<=end_ts:
candles = _fetch_ohlcv(exchange=exchange, symbol=symbol, timeframe=candle_size, since=int(this_cutoff * 1000), limit=num_candles_limit, params=params)
if candles and len(candles)>0:
all_candles = all_candles + [[ int(x[0]), float(x[1]), float(x[2]), float(x[3]), float(x[4]), float(x[5]) ] for x in candles if x[1] and x[2] and x[3] and x[4] and x[5] ]
record_ts = max([int(record[0]) for record in candles])
record_ts_str : str = str(record_ts)
if len(record_ts_str)==13:
record_ts = int(int(record_ts_str)/1000) # Convert from milli-seconds to seconds
this_cutoff = record_ts + 1
columns = ['exchange', 'symbol', 'timestamp_ms', 'open', 'high', 'low', 'close', 'volume']
pd_all_candles = pd.DataFrame([ [ exchange.name, symbol, x[0], x[1], x[2], x[3], x[4], x[5] ] for x in all_candles], columns=columns)
_fix_column_types(pd_all_candles)
pd_all_candles['pct_chg_on_close'] = pd_all_candles['close'].pct_change()
return pd_all_candles
candles_cache = {}
for ticker in white_list_tickers:
candles = fetch_candles(
normalized_symbols = [ ticker ],
exchange = target_exchange,
start_ts = dt_start.timestamp(),
end_ts = dt_end.timestamp(),
candle_size = candle_size)
candles_cache[ticker] = candles[ticker]['pct_chg_on_close']
pd_candles_cache = pd.DataFrame(candles_cache)
correlations = pd_candles_cache.corr()
correlations.to_csv('correlations.csv')
correlations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment