-
-
Save normanlmfung/aaaadeb4de284986a33b3b490ad34fbe to your computer and use it in GitHub Desktop.
crypto_pairs_correlation.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from typing import Dict, List, Union, NoReturn | |
import pandas as pd | |
from ccxt.binance import binance | |
from ccxt.bitmex import bitmex | |
from ccxt.okx import okx | |
from ccxt.base.exchange import Exchange | |
from ccxt.base.exchange import Exchange as CcxtExchange | |
# https://www.analyticsvidhya.com/blog/2021/06/download-financial-dataset-using-yahoo-finance-in-python-a-complete-guide/ | |
from yahoofinancials import YahooFinancials | |
# yfinance allows intervals '1m', '5m', '15m', '1h', '1d', '1wk', '1mo'. yahoofinancials not as flexible | |
import yfinance as yf | |
num_candles_limit : int = 100 # Depends on exchange but generally 100 ok! | |
param = { | |
'apiKey' : None, | |
'secret' : None, | |
'password' : None, | |
'subaccount' : None, | |
'rateLimit' : 100, # In ms | |
'options' : { | |
'defaultType': 'spot', | |
'leg_room_bps' : 5, | |
'trade_fee_bps' : 3 | |
} | |
} | |
dt_start : datetime = datetime(2021,1,1) | |
dt_end : datetime = datetime(2024,4,25) | |
candle_size : str = '1d' | |
target_exchange = binance(param) | |
white_list_tickers = [ 'BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'ADA/USDT', 'MATIC/USDT', 'AVAX/USDT', 'LINK/USDT', 'DOT/USDT', 'ATOM/USDT' ] | |
class YahooExchange: | |
def fetch_candles( | |
self, | |
start_ts, | |
end_ts, | |
symbols, | |
candle_size | |
) -> Dict[str, Union[pd.DataFrame, None]]: | |
exchange_candles : Dict[str, Union[pd.DataFrame, None]] = {} | |
start_date = datetime.fromtimestamp(start_ts) | |
end_date = datetime.fromtimestamp(end_ts) | |
start_date_str = start_date.strftime('%Y-%m-%d') | |
end_date_str = end_date.strftime('%Y-%m-%d') | |
local_tz = datetime.now().astimezone().tzinfo | |
for symbol in symbols: | |
# From yf, "DateTime" in UTC | |
pd_candles = yf.download(tickers=symbol, start=start_date_str, end=end_date_str, interval=candle_size) | |
pd_candles.reset_index(inplace=True) | |
pd_candles.rename(columns={'Datetime' : 'datetime', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Close' : 'close', 'Adj Close' : 'adj_close', 'Volume' : 'volume' }, inplace=True) | |
pd_candles['datetime'] = pd_candles['datetime'].dt.tz_convert(local_tz) | |
pd_candles['timestamp_ms'] = pd_candles.datetime.values.astype(np.int64) // 10**6 | |
pd_candles = pd_candles.sort_values(by=['timestamp_ms'], ascending=[True]) | |
exchange_candles[symbol] = pd_candles | |
return exchange_candles | |
def fetch_candles( | |
start_ts, # in sec | |
end_ts, # in sec | |
exchange, | |
normalized_symbols, | |
candle_size, | |
how_many_candles = None, | |
logger = None, | |
num_candles_limit : int = 100, | |
cache_dir : Union[str, None] = None, | |
list_ts_field : Union[str, None] = None, | |
validation_max_gaps : int = 10, | |
validation_max_end_date_intervals : int = 1 | |
) -> Dict[str, Union[pd.DataFrame, None]]: | |
if type(exchange) is YahooExchange: | |
return exchange.fetch_candles( | |
start_ts=start_ts, | |
end_ts=end_ts, | |
symbols=normalized_symbols, | |
candle_size=candle_size | |
) | |
elif issubclass(exchange.__class__, CcxtExchange): | |
return _fetch_candles_ccxt( | |
start_ts=start_ts, | |
end_ts=end_ts, | |
exchange=exchange, | |
normalized_symbols=normalized_symbols, | |
candle_size=candle_size, | |
how_many_candles=how_many_candles, | |
logger=logger, | |
num_candles_limit=num_candles_limit, | |
cache_dir=cache_dir, | |
list_ts_field=list_ts_field | |
) | |
return { '' : None } | |
def _fetch_candles_ccxt( | |
start_ts : int, | |
end_ts : int, | |
exchange, | |
normalized_symbols : List[str], | |
candle_size : str, | |
how_many_candles : Union[int, None], | |
num_candles_limit : int = 100, | |
logger = None, | |
cache_dir : Union[str, None] = None, | |
list_ts_field : Union[str, None] = None | |
)-> Dict[str, Union[pd.DataFrame, None]]: | |
ticker = normalized_symbols[0] | |
pd_candles = _fetch_candles( | |
symbol = ticker, | |
exchange = exchange, | |
start_ts = start_ts, | |
end_ts = end_ts, | |
candle_size = candle_size, | |
) | |
return { | |
ticker : pd_candles | |
} | |
def _fetch_candles( | |
symbol : str, | |
exchange : CcxtExchange, | |
start_ts : int, | |
end_ts : int, | |
candle_size : str = '1d', | |
num_candles_limit : int = 100 | |
): | |
def _fix_column_types(pd_candles : pd.DataFrame): | |
pd_candles['open'] = pd_candles['open'].astype(float) | |
pd_candles['high'] = pd_candles['high'].astype(float) | |
pd_candles['low'] = pd_candles['low'].astype(float) | |
pd_candles['close'] = pd_candles['close'].astype(float) | |
pd_candles['volume'] = pd_candles['volume'].astype(float) | |
pd_all_candles['datetime'] = pd_all_candles['timestamp_ms'].apply(lambda x : datetime.fromtimestamp(int(x/1000))) | |
pd_candles['datetime'] = pd.to_datetime(pd_candles['datetime']) | |
# This is to make it easy to do grouping with Excel pivot table | |
pd_candles['year'] = pd_candles['datetime'].dt.year | |
pd_candles['month'] = pd_candles['datetime'].dt.month | |
pd_candles['day'] = pd_candles['datetime'].dt.day | |
pd_candles['hour'] = pd_candles['datetime'].dt.hour | |
pd_candles['minute'] = pd_candles['datetime'].dt.minute | |
def _fetch_ohlcv(exchange, symbol, timeframe, since, limit, params) -> Union[List, NoReturn]: | |
one_timeframe = f"1{timeframe[-1]}" | |
candles = exchange.fetch_ohlcv(symbol=symbol, timeframe=one_timeframe, since=since, limit=limit, params=params) | |
if candles and len(candles)>0: | |
candles.sort(key=lambda x : x[0], reverse=False) | |
return candles | |
all_candles = [] | |
params = {} | |
this_cutoff = start_ts | |
while this_cutoff<=end_ts: | |
candles = _fetch_ohlcv(exchange=exchange, symbol=symbol, timeframe=candle_size, since=int(this_cutoff * 1000), limit=num_candles_limit, params=params) | |
if candles and len(candles)>0: | |
all_candles = all_candles + [[ int(x[0]), float(x[1]), float(x[2]), float(x[3]), float(x[4]), float(x[5]) ] for x in candles if x[1] and x[2] and x[3] and x[4] and x[5] ] | |
record_ts = max([int(record[0]) for record in candles]) | |
record_ts_str : str = str(record_ts) | |
if len(record_ts_str)==13: | |
record_ts = int(int(record_ts_str)/1000) # Convert from milli-seconds to seconds | |
this_cutoff = record_ts + 1 | |
columns = ['exchange', 'symbol', 'timestamp_ms', 'open', 'high', 'low', 'close', 'volume'] | |
pd_all_candles = pd.DataFrame([ [ exchange.name, symbol, x[0], x[1], x[2], x[3], x[4], x[5] ] for x in all_candles], columns=columns) | |
_fix_column_types(pd_all_candles) | |
pd_all_candles['pct_chg_on_close'] = pd_all_candles['close'].pct_change() | |
return pd_all_candles | |
candles_cache = {} | |
for ticker in white_list_tickers: | |
candles = fetch_candles( | |
normalized_symbols = [ ticker ], | |
exchange = target_exchange, | |
start_ts = dt_start.timestamp(), | |
end_ts = dt_end.timestamp(), | |
candle_size = candle_size) | |
candles_cache[ticker] = candles[ticker]['pct_chg_on_close'] | |
pd_candles_cache = pd.DataFrame(candles_cache) | |
correlations = pd_candles_cache.corr() | |
correlations.to_csv('correlations.csv') | |
correlations |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment